The kafka connect HTTP Sink connector is very useful to be able to send any message from a topic to a remote http service by get or post method. A use case in which I have used it is in an integration between SAP and other corporate products integrated through kafka, using this connector to send the Idoc messages to SAP produced by other products, through the HTTP XML-IDOC service that provides SAP.

I will detail in this post how to mount an integration between two products through a kafka topic and with a kafka connect HTTP Sink. In another post I will detail all those related to error management, both functional and transformation and general exceptions, and a monitoring and action solution as automatic as possible to solve service unavailability. An outline of the general solution is as follows:

The producer will leave the messages in the topic 'test-topic', and the HTTP sink 'connect-http-sink-simple' connector will send them to the 'listenerhttp-node' through HTTP requests using the POST method.

The message creator application will be simulated through a producer implemented with the kafka cli script.

The consuming application will be an http listener implemented with nodejs that handles http POST requests to process it.

In addition, a topic will be mounted where the connector will send all the messages that it could not send to the http listener due to an error (not all errors, this is very important), this is the topic called in the previous diagram 'test-topic-error '.

The kafka infrastructure will be deployed using confluent kafka in docker containers, and on this infrastructure we will modify the 'connect' container that comes with a kafka connect datagen that we will remove and specify the kafka connect HTTP Sink. These containers managed with docker-compose. And in this way you are one step away from being able to easily deploy it in kubernetes.

First of all, we need to have the following software on our computer:

  • nodejs:  v10.16.2
  • docker: Docker version 18.09.3, build 774a1f4
  • docker-compose: docker-compose version 1.18.0, build 8dd22a9
  • kafka_cli: 2.12-2.3.0

To mount the consumer with nodejs we create a directory 'listenerhttp-node', access it and execute the command 'npm init' that after asking us some questions will generate a package.json file like the following:

Now we create the index.js file where we define the http listener that receives the POST requests of the messages coming from the topic through the kafka connect HTTP sink:

const http = require('http');
http.createServer(function (req, res) {
	console.log('..' + req.method + ' method invoked');
  if (req.method === 'POST') {
    let body = '';
    req.on('data', chunk => {
        body += chunk.toString(); 
    });
    req.on('end', () => {
        console.log('..' + body);
        res.end('ok');
    });
  }
  else {
    res.writeHead(200, {'Content-Type': 'text/plain'});
    res.write('Please, send a post request');
    res.end();
  }
}).listen(8080);
console.log('..Listener http serving by port 8080');

And located in the same directory we run the server with the command 'node main.js':

We already have the listener attending by http: // localhost: 8080. We can check it with a call with Postman like the one shown in the image:

And the correct processing is seen in the answer 'ok' and in the trace of the console of the listener http:

Now we are going to assemble the entire confluent kafka system in its entirety, for which we access your download page https://www.confluent.io/download/ and at the bottom we download the free commercial version:

And among the options it provides us to download confluent kafka select 'Docker images':

And when you press the 'Download Version 5.3.1' button (this is the last version when I'm writing this one, the version will change over time, and it will be similar) takes us to the following page:

Click on 'Docker images' and it takes us to hub.docker.com and from this list of containers we will use the following that make up a very complete kafka ecosystem (not all are necessary to detail the operation of the HTTP Sink kafka connector):

  • confluentinc/cp-zookeeper
  • confluentinc/cp-enterprise-kafka
  • confluentinc/cp-schema-registry
  • confluentinc/cp-enterprise-control-center
  • confluentinc/cp-kafka-rest
  • confluentinc/cp-kafka-connect-base
  • confluentinc/cp-ksql-server
  • confluentinc/cp-ksql-cli

To do this we will create a kafka531 directory and within this directory we will create the following docker-compose.yml file where we will specify the configuration to download these images and execute the containers:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.3.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/kafka-connect-http-sink
    build:
      context: .
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    extra_hosts:
      - "dockerhost:$DOCKERHOST"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center:5.3.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema-registry
      - connect
      - ksql-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
      CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.3.1
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  ksql-server:
    image: confluentinc/cp-ksql-server:5.3.1
    hostname: ksql-server
    container_name: ksql-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksql-server
      KSQL_APPLICATION_ID: "cp-all-in-one"
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"

  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.3.1
    container_name: ksql-cli
    depends_on:
      - broker
      - connect
      - ksql-server
    entrypoint: /bin/sh
    tty: true

In this file we specify the creation of 8 containers of different confluent kafka services, which by the specified name its function is interpreted and which I summarize below:

  • zookeeper: It is the component responsible for managing all configuration and synchronization. He is in charge of relating consumers with brokers.
  • broker:  It is the topic manager. It serves through port 9092 for requests external to the kafka ecosystem, and for 29092 for internal requests between kafka components, such as from a connector component such as the connect HTTP sink of this example.
  • schema-registry: Is the message typing record with Avro.
  • connect: It is the connector with HTTP Sink that we mount for that example. Serves by port 8083.
  • control-center: Provides a web console through port 9021 to manage the system.
  • rest-proxy: Provides an HTTP REST API that allows integration with kafka topics through this API, such as to produce messages.
  • ksql-server: Provides a service that allows real-time processing with an SQL type interface.
  • ksql-cli: It is a client for the ksql-server service.

Además de este fichero nos creados los dos ficheros siguientes que copio el contenido:

  • Dockerfile: File with docker commands to create the connect container with the HTTP Sink connector from the image 'confluentinc/ p-kafka-connect-base: .3.1':
FROM confluentinc/cp-kafka-connect-base:5.3.1

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-http:latest

ADD log4j.properties.template /etc/confluent/docker/log4j.properties.template

  • log4j.properties.template: A change in the template to leave the HTTP Sink connector logs in the / tmp directory of the container and with level 'INFO':
log4j.rootLogger={{ env["CONNECT_LOG4J_ROOT_LOGLEVEL"] | default('INFO') }}, logFile, stdout

log4j.appender.logFile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.logFile.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.logFile.File=/tmp/connect-worker.log
log4j.appender.logFile.layout=org.apache.log4j.PatternLayout
log4j.appender.logFile.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

{% set default_loggers = {
	'org.reflections': 'ERROR',
	'org.apache.zookeeper': 'ERROR',
	'org.I0Itec.zkclient': 'ERROR'
} -%}

{% if env['CONNECT_LOG4J_LOGGERS'] %}
# loggers from CONNECT_LOG4J_LOGGERS env variable
{% set loggers = parse_log4j_loggers(env['CONNECT_LOG4J_LOGGERS']) %}
{% else %}
# default log levels
{% set loggers = default_loggers %}
{% endif %}
{% for logger,loglevel in loggers.iteritems() %}
log4j.logger.{{logger}}={{loglevel}}
{% endfor %}

Before executing, we review the docker images we have. First we check if there are containers running with image name 'confluentinc/*:5.3.2' with the command:

docker ps

If you return any container that belongs to images with the name 'confluentinc / *: 5.3.2' we stop them with:

docker container stop CONTAINER_ID 

Then we remove the stopped container with:

docker rm $(docker ps -a -q)

And we can already delete all the images of the 'confluentinc/*' repository with TAG 5.3.1 to avoid "interference".

docker rmi $(docker images "confluentinc/*:5.3.1")

To boot everything we create a startAllKafka.sh file that first creates a DOCKERHOST environment variable that is referenced in the docker-compose.yml file so that the container can use the name 'dockerhost' reference to the host IP where it is run the 'listenerhttp-node' (service on the host, not in the container and network instance that you create docker-compose when you run it), and then execute the 'docker-compose up -d' command that processes all the defined containers in the docker-compose.yml file:

#!/bin/bash
export DOCKERHOST=$(ifconfig | grep -E "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep -v 127.0.0.1 | awk '{ print $2 }' | cut -f2 -d: | head -n1)
docker-compose up -d

When executing this 'docker-compose up -d' command, the base images will be downloaded and then the containers will be executed, all in the same network when executed within the same docker-compose command, with the configuration specified in the yml file with a result such as the next:

[operatorfeitam@localhost kafka531]$ ls -la
total 32
drwxrwxr-x.  2 operatorfeitam operatorfeitam 4096 Oct 10 19:52 .
drwxrwxr-x. 14 operatorfeitam operatorfeitam 4096 Oct  9 11:27 ..
-rw-rw-r--.  1 operatorfeitam operatorfeitam 5722 Oct 10 19:31 docker-compose.yml
-rw-rw-r--.  1 operatorfeitam operatorfeitam  859 Oct  9 21:16 Dockerfile
-rw-rw-r--.  1 operatorfeitam operatorfeitam 1040 Oct  8 12:24 log4j.properties.template
-rwxrwxr-x.  1 operatorfeitam operatorfeitam  165 Oct 10 19:48 setEnvironmentVariables.sh
-rwxr-xr-x.  1 operatorfeitam operatorfeitam  169 Oct 10 19:52 startAllKafka.sh
[operatorfeitam@localhost kafka531]$ ./startAllKafka.sh
Pulling zookeeper (confluentinc/cp-zookeeper:5.3.1)...
5.3.1: Pulling from confluentinc/cp-zookeeper
b0b8081c21b9: Pull complete
38f81ce68d4d: Pull complete
b1edc2170bc8: Pull complete
2e4921c3950d: Pull complete
ea4a4c6571a0: Pull complete
Digest: sha256:ce26ef6c7598087cb8306fcf184bd5b3ef41719571a8f246b5b86b2eb240d65a
Status: Downloaded newer image for confluentinc/cp-zookeeper:5.3.1
Pulling broker (confluentinc/cp-enterprise-kafka:5.3.1)...
5.3.1: Pulling from confluentinc/cp-enterprise-kafka
b0b8081c21b9: Already exists
38f81ce68d4d: Already exists
b1edc2170bc8: Already exists
3e184556b52d: Pull complete
0637e0f67a54: Pull complete
6f0d2bf07b0f: Pull complete
449e9b63a803: Pull complete
Digest: sha256:0e3f485b6cf2f9dc37007ee206dc19147a5698eaaaebb3bca20dfea387a67514
Status: Downloaded newer image for confluentinc/cp-enterprise-kafka:5.3.1
Pulling schema-registry (confluentinc/cp-schema-registry:5.3.1)...
5.3.1: Pulling from confluentinc/cp-schema-registry
b0b8081c21b9: Already exists
38f81ce68d4d: Already exists
b1edc2170bc8: Already exists
a78f31ee19df: Pull complete
f12c63c83f73: Pull complete
Digest: sha256:f696ed4b535257c76c3d908bf4e7c0ea2abdd19c5c85edd6364bdf16cf742809
Status: Downloaded newer image for confluentinc/cp-schema-registry:5.3.1
Pulling rest-proxy (confluentinc/cp-kafka-rest:5.3.1)...
5.3.1: Pulling from confluentinc/cp-kafka-rest
b0b8081c21b9: Already exists
38f81ce68d4d: Already exists
b1edc2170bc8: Already exists
fd15f2b03154: Pull complete
3bdf4efe8820: Pull complete
Digest: sha256:70bc65f6db106d22002ad54e66cac5083570e5b482e6c4554fb23d3013398c56
Status: Downloaded newer image for confluentinc/cp-kafka-rest:5.3.1
Building connect
Step 1/4 : FROM confluentinc/cp-kafka-connect-base:5.3.1
5.3.1: Pulling from confluentinc/cp-kafka-connect-base
b0b8081c21b9: Already exists
38f81ce68d4d: Already exists
b1edc2170bc8: Already exists
3e184556b52d: Already exists
0637e0f67a54: Already exists
a70e661df81d: Pull complete
fd8e3300346b: Pull complete
Digest: sha256:697f368853e5b7dcf89e685bcd174712883ef9dd22658ef975e5909da282ad86
Status: Downloaded newer image for confluentinc/cp-kafka-connect-base:5.3.1
 ---> 74bdc4d9f804
Step 2/4 : ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
 ---> Running in 821feb641780
Removing intermediate container 821feb641780
 ---> f451b7e7135d
Step 3/4 : RUN confluent-hub install --no-prompt confluentinc/kafka-connect-http:latest
 ---> Running in 8f8febacb922
Running in a "--no-prompt" mode 
Implicit acceptance of the license below:  
Confluent Software Evaluation License 
https://www.confluent.io/software-evaluation-license 
Downloading component Kafka Connect Http Sink Connector 1.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /usr/share/confluent-hub-components 
Adding installation directory to plugin path in the following files: 
  /etc/kafka/connect-distributed.properties 
  /etc/kafka/connect-standalone.properties 
  /etc/schema-registry/connect-avro-distributed.properties 
  /etc/schema-registry/connect-avro-standalone.properties 
 
Completed 
Removing intermediate container 8f8febacb922
 ---> df83dcea885a
Step 4/4 : ADD log4j.properties.template /etc/confluent/docker/log4j.properties.template
 ---> 4ac36b0fd28d
Successfully built 4ac36b0fd28d
Successfully tagged confluentinc/kafka-connect-http-sink:latest
WARNING: Image for service connect was built because it did not already exist. To rebuild this image you must use `docker-compose build` or `docker-compose up --build`.
Pulling ksql-server (confluentinc/cp-ksql-server:5.3.1)...
5.3.1: Pulling from confluentinc/cp-ksql-server
a4d8138d0f6b: Pull complete
6d0181143a32: Pull complete
0a60585530f9: Pull complete
bce16161d878: Pull complete
393b6b922701: Pull complete
6c3f8cc3ae1e: Pull complete
0540dc29e89c: Pull complete
e0441f18d593: Pull complete
de2c35bbde1c: Pull complete
7bb8238f726f: Pull complete
6c9eef97a714: Pull complete
aad620a23079: Pull complete
fbc187b7292e: Pull complete
e82fa8159e77: Pull complete
ea138fdae396: Pull complete
a31cf463b757: Pull complete
9fe13b613062: Pull complete
7d4d203f6b88: Pull complete
4b27f61b3727: Pull complete
14044d281d8e: Pull complete
b40a9d398f7e: Pull complete
55882a05adfd: Pull complete
Digest: sha256:0982241bbe585ddb5af3ea38d1c6d55bcae88f74aff20c9bf39318d09b435916
Status: Downloaded newer image for confluentinc/cp-ksql-server:5.3.1
Pulling control-center (confluentinc/cp-enterprise-control-center:5.3.1)...
5.3.1: Pulling from confluentinc/cp-enterprise-control-center
b0b8081c21b9: Already exists
38f81ce68d4d: Already exists
b1edc2170bc8: Already exists
efa5d78d426f: Pull complete
ee66d7c30c2c: Pull complete
Digest: sha256:988612aef391d92fa6c079d49add87dc8371581f2745f907fa078bc49a1924e4
Status: Downloaded newer image for confluentinc/cp-enterprise-control-center:5.3.1
Pulling ksql-cli (confluentinc/cp-ksql-cli:5.3.1)...
5.3.1: Pulling from confluentinc/cp-ksql-cli
a4d8138d0f6b: Already exists
6d0181143a32: Already exists
0a60585530f9: Already exists
bce16161d878: Already exists
393b6b922701: Already exists
6c3f8cc3ae1e: Already exists
0540dc29e89c: Already exists
e0441f18d593: Already exists
de2c35bbde1c: Already exists
7bb8238f726f: Already exists
6c9eef97a714: Already exists
aad620a23079: Already exists
f7a7b7de57fb: Pull complete
84627795fa4b: Pull complete
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
0d78c77ea313: Pull complete
Creating connect ... done
Creating ksql-server ... done
Status: Downloaded newer image for confluentinc/cp-ksql-cli:5.3.1
Creating control-center ... done
Creating broker ... 
Creating schema-registry ... 
Creating rest-proxy ... 
Creating connect ... 
Creating ksql-server ... 
Creating control-center ... 
Creating ksql-cli ... 
[operatorfeitam@localhost kafka531]$ 

You can check the raised containers with the command:

docker ps

That should give a result similar to:

To stop the containers we can execute the command next (or stop each container as specified before with 'docker container stop CONTAINER_ID'):

docker container stop $(docker container ls -a -q -f "label=io.confluent.docker") 

The container 'confluentinc/kafka-connect-http-sink', which we have named 'connect' in the file docker-compose.yml, is a container based on the image 'confluentinc/cp-kafka-connect-base:5.3.1' to which the kafka http sink connector has been added. Let's look at the definition of the 'connect' container in the docker-compose.yml file:

  connect:
    image: confluentinc/kafka-connect-http-sink
    build:
      context: .
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    extra_hosts:
      - "dockerhost:$DOCKERHOST"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

In this definition of 'connect' you are told to create an image called 'confluentinc/kafka-connect-http-sink' that is constructed from the Dockerfile file that we have left in the same directory. In that Dockerfile file we are telling you the following:

  • Use the base image confluentinc/cp-kafka-connect-base: 5.3.1.
  • The CONNECT_PLUGIN_PATH environment variable is specified where the installed plugins should be searched.
  • The kafka-connect-http connector is installed, which will be installed in /usr/share/confluent-hub-components.
  • The log4j template is changed so that a log is generated in /tmp/connect-worker.log (I believe that this is not necessary if an environment variable is sent to the container specified by the DEBUG level).

In this same image you can install more connectors, or mount other homologous containers for other kafka connectors. I recommend separating the connectors in different containers so as not to have problems in the parameterization of these through their properties since different connectors can overlap the properties, always considering the system as the first reference.


The image 'confluentinc/cp-kafka-connect-base:5.3.1' starts kafka connect in distributed mode which is correct for production environments. But if instead of using the entreprise broker 'confluentinc/cp-enterprise-kafka' we use the community 'confluentinc/cp-kafka' I think (I have not certified it) that the connectors can only work in standalone mode, and so you can change to work in standalone mode you must change in the container 'confluentinc/cp-kafka-connect-base: 5.3.1' in the file /etc/confluent/docker/launch the last line of:

exec connect-distributed /etc/"${COMPONENT}"/"${COMPONENT}".properties

to:

exec connect-standalone /etc/"${COMPONENT}"/"${COMPONENT}".properties

It is very important to know that in the 'confluentinc/cp-kafka-connect-base:5.3.1' the entire configuration is launched when the container is started by executing /etc/confluent/docker/run. And this script is the one that calls the /etc/confluent/docker/launch discussed above. Therefore, any desired change in the behavior of the connector that cannot be made by its properties file or by environment variables must be analyzed and changed in this execution of /etc/confluent/docker/run.

At that point we already have everything running, with the http sink connector installed. But we still don't have any http sink connector task running.

Kafka connect provides an API so that among other things we can obtain:

  • Plugins connectors installed: http://localhost:8083/connector-plugins
  • Connectors configured: http://localhost:8083/connectors
  • Detail of a configured connector: http://localhost:8083/connectors/{name_connector}

The command http://localhost:8083/connectors, as there is no configured connector yet, you should not return anything.

Before creating a 'connect-http-sink-simple' connector configuration, we will create the topic 'test-topic' by executing the following producer and consumer commands on two different consoles using kafka_cli commands.

Producer creation order (it is the kafka_cli command that really creates the topic if it does not exist, not the consumer command):

./kafka-console-producer.sh --broker-list localhost:9092  --topic test-topic

Consumer creation order:

./kafka-console-consumer.sh --bootstrap-server localhost:9092  --topic test-topic

And at the producer's prompt we write something and press enter and it should appear on the consumer:

To configure a connector, in our case the 'connect-http-sink-simple' connector we have two options:

  • Leave a connect-http-sink-simple.properties file in the path /etc/kafka-connect of the image 'confluentinc/kafka-connect-http-sink' and modify the last line of the file /etc/confluent/docker/launch of that image to add at the end as another parameter of 'exec connect-distribute' (without removing anything from what the line has) "/etc/kafka-connect/connect-http-sink-simple.properties". In this case you have to stop the container and start it.
  • Make a POST call, for example with Postman or curl, to http://localhost:8083/connectors, specifying the 'Content-Type' header with value 'application/json' and sending the following content with the desired configuration in the connector:
{
    "name": "connect-http-sink-simple",
    "config": {
        "connector.class": "io.confluent.connect.http.HttpSinkConnector",
        "tasks.max": "1",
        "topics": "test-topic",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "broker:29092",
        "confluent.topic.replication.factor": "1",
        "http.api.url": "http://dockerhost:8080",
        "max.retries": "10",
        "retry.backoff.ms": "30000",
        "confluent.license": "",
        "errors.retry.delay.max.ms": "60000",
        "errors.retry.timeout": "10000",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "errors.deadletterqueue.topic.name": "test-topic-error",
        "errors.deadletterqueue.topic.replication.factor": "1",
        "errors.deadletterqueue.context.headers.enable": "true"
    } 
}

And the answer must be an HTTP code '201 created' with:

{
    "name": "connect-http-sink-simple",
    "config": {
        "connector.class": "io.confluent.connect.http.HttpSinkConnector",
        "tasks.max": "1",
        "topics": "test-topic",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "broker:29092",
        "confluent.topic.replication.factor": "1",
        "http.api.url": "http://dockerhost:8080",
        "max.retries": "10",
        "retry.backoff.ms": "30000",
        "confluent.license": "",
        "errors.retry.delay.max.ms": "60000",
        "errors.retry.timeout": "10000",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "errors.deadletterqueue.topic.name": "test-topic-error",
        "errors.deadletterqueue.topic.replication.factor": "1",
        "errors.deadletterqueue.context.headers.enable": "true",
        "name": "connect-http-sink-simple"
    },
    "tasks": [
        {
            "connector": "connect-http-sink-simple",
            "task": 0
        }
    ],
    "type": "sink"
}

If the tasks element has content, everything went well, otherwise you have to enter the container with 'docker exec -it connect /bin/bash' and see the log in /tmp/connect-worker.log.

Another way to see the connector is by running: http://localhost:8083/connectors/connect-http-sink-simple.

To remove the connector (not the http rest plugin) you can make an HTTP call with DELETE method to the URL http://localhost:8083/connectors/connect-http-sink-simple, and it will return an HTTP code '204 or Content ':

As soon as we create the connector, we can see in the 'listenerhttp-node' console that the messages written prior to the creation of the HTTP Sink connector have now been sent to the 'listenerhttp-node' because it is another consumer of the topic that has not yet consumed these messages:

Now we can write new messages in the producer and we will see that they appear in the consumer and that the 'connect-http-sink-simple' connector sends it to 'listenerhttp-node':

We confirm sending several messages and certify the operation.

For now we have not had errors in our 'httplistener-node' but we can verify in the control center of Confluent Kafka that it is available by HTTP on port 9021 that the test-topic-error queue is generated. By default the broker is configured to generate the topics if they do not exist when called by a producer, and as the 'connect-http-sink-simple' connector is configured as a topic where to send errors to the topic 'test-topic-error' It is a producer and this topic is generated. A screenshot of this:

From this control center, I remember that it is from the enterprise version, you can perform many actions such as:

  • See the status and general statistics of the brokers.
  • See the topics and generate topics. If we are in a topic you can leave the message page of a topic open and during the time it is open you can check the messages of that topic.
  • You can create connectors: although for this you have to configure the configuration file of the original control center where you specify the connection to 'connect, a priori, in the version I have tried, you need to specify' http://' ahead of 'connect:8083', and restart the control center.
  • View the KSQL servers and execute ksql queries as a client.
  • See consumer groups.
  • View and define the configuration of the kafka cluster and brokers.

Now we are going to run a 'test-topic-error' message consumer in a console to see if we see error messages:

./kafka-console-consumer.sh --bootstrap-server localhost:9092  --topic test-topic-error

And we will see that there are no errors for not having failed any message.

The definition of error management defined in the connector (the properties that appear after "confluent.license": "") are for errors in the conversion or transformation within the connector. Other exceptions are not managed and the connector is stopped in the 'fail' state. We can check it by stopping the 'listenerhttp-node', and sending messages from the producer. We will see that the consumer receives the messages, but not the 'listenerhttp-node' for being stopped. If we raise the listener again, what will we see? that the previous messages not yet received in 'listenerhttp-node' are not processed and that it does not process new producer messages now that the 'listenerhttp-node' is already raised. This is due to the fact that the connector detects any exception (outside the conversion or transformation) stops and stops at the 'fail' state. But do not worry, that the messages have not been lost, they remain in the topic pending processing through the 'connect-http-sink-simple' connector.

The status of the connector can be checked by the API by making a GET call to /conectors/{name_connector}/status:

In this example we see that the connector is in 'running' state and that it has only one '0' task also in 'running' state.

Now we stop the 'listenerhttp-node' and produce some messages. We wait a while for the retries to be fulfilled and you wait configured in the properties. And we access the 'connect' container with the command:

docker exec -it connect /bin/bash

Already within docker we execute the following:

cd /tmp
ls -la
tail -1000f connect-worker.log

And we will see this error in the message that you cannot connect to 'listenerhttp-node' when stopped:

We see the error and that the task has stopped. And because of this, because it is stopped, even if we start the 'listenerhttp-node' now, neither the previous messages pending processing nor the ones after the start of 'listenerhttp-node' are processed.

Calling the API we will check the status of the connector and its task:

And the state of the task:

Now we have two options:

  • Remove and re-create the 'connect-http-sink-simple' connector as detailed before. When you create it again it starts automatically.
  • Restart the task with a POST call to http://localhost:8083/connectors/connect-http-sink-simple/tasks/0/restart.

I prefer to restart the task and if it returns an HTTP 204 it is that the task has been restarted (and the connector will also be in a 'running' state), and the previous messages are processed not being lost:

It is very important to know that a kafka connector is managing the offset of the last message processed to be able to be resumed and start where I stopped processing before, for example due to an error. In another post I will detail how a kafka connector is implemented, where this offset management will be seen to be able to be resumed correctly.

Knowing how all this error management is in a production system, a monitoring system of the target services of the sink connectors, the logs of the connectors and the status of the connectors and their tasks can be set up, to perform the required actions and automatically restart the tasks of the connectors in the 'fail' state. In another post I will detail a possible solution to this and the ability Prometheus gives to manage all this.

I leave all the code of this example in Github:

https://github.com/jlsvfeitam/kafka-connect-http-sink-first-steps

Related and complementary information: