Kafka Connect - Distributed Worker

Converter Basics

About

The runtime distributed mode of connect when running/starting a worker

Management

Metadata (Internal topics)

See Kafka Connect - Storage Topics

Start

Command line

bin/connect-distributed worker.properties

# Example -  Configuration that works well with Kafka and Schema Registry services running locally, no more than one broker
$ ./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties

where:

  • worker.properties is the configuration file

New workers will either start a new group or join an existing one based on the worker properties provided.

The connector configuration is not given at the command line because coordination is required between workers. See below connector

Docker

docker run -d \
  --name=kafka-connect \
  --net=host \
  -e CONNECT_PRODUCER_INTERCEPTOR_CLASSES=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor \
  -e CONNECT_CONSUMER_INTERCEPTOR_CLASSES=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor \
  -e CONNECT_BOOTSTRAP_SERVERS=localhost:29092 \
  -e CONNECT_REST_PORT=28082 \
  -e CONNECT_GROUP_ID="quickstart" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-status" \
  -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
  -e CONNECT_LOG4J_LOGGERS=org.reflections=ERROR \
  -e CONNECT_PLUGIN_PATH=/usr/share/java \
  -v /tmp/quickstart/file:/tmp/quickstart \
  confluentinc/cp-kafka-connect:4.0.0

Failure

When a worker fails, tasks are rebalanced across the active workers. When a task fails, no rebalance is triggered as a task failure is considered an exceptional case.

If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers.

Worker Model Basics

Configuration Management

Worker

See

Cluster

Distributed workers that are configured with matching group.id values in their configuration file automatically discover each other and form a cluster.

All worker configurations must have matching:

  • config.storage.topic,
  • offset.storage.topic,
  • and status.storage.topic properties.

Locally

In distributed mode, if you run more than one worker per host (for example, if you are testing distributed mode locally during development), the following settings must have different values for each instance:

  • rest.port - the port the REST interface listens on for HTTP requests

Connector

Creation

Rest

To create a connector, you start the workers and then make a REST request to create/manage a connector

Example: FileSinkConnector where the worker has been started on localhost

curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors
  • or inline
 curl -X POST -H "Content-Type: application/json" --data '{"name": "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector", "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors

Use the REST API to deploy and manage the connectors when running in distributed mode. See connect-managing-distributed-mode

Confluent cli

With the cli. Load a bundled connector with a predefined name or custom connector with a given configuration.

Upon success it will print the connector’s configuration.

confluent load [<connector-name> [-d <connector-config-file>]]

where:

  • connector-name is the id of the connector (which is its name). If the config file has it, it will be overwritten with the command line value.
  • connector-config-file can be a properties or json files. If not specified, default to /etc/kafka/connect-<connector-name>.properties.

Example:

confluent load file-source
# same as 
confluent load file-source -d /etc/kafka/connect-file-source.properties
{                                          
  "name": "file-source",                   
  "config": {                              
    "connector.class": "FileStreamSource", 
    "tasks.max": "1",                      
    "file": "test.txt",                    
    "topic": "connect-test",               
    "name": "file-source"                  
  },                                       
  "tasks": []                              
}

Status

Connectors and their tasks publish status updates to a shared topic (configured with status.storage.topic) which all workers in the cluster monitor.

You can see the signification of the state here.

# All connectors
confluent status connectors
# One connector
confluent status connectorName
{
  "name": "connectorName",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "127.0.0.1:8083"
    }
  ]
}

Because of the rebalancing that happens between worker tasks every time a connector is loaded, a call to confluent status <connector-name> might not succeed immediately after a new connector is loaded. Once rebalancing completes, such calls will be able to return the actual status of a connector.

List

Predefined Connectors

Predefined Connectors are just properties file in /etc/kafka

With the cli:

confluent list connectors
Bundled Predefined Connectors (edit configuration under etc/):
  elasticsearch-sink
  file-source
  file-sink
  jdbc-source
  jdbc-sink
  hdfs-sink
  s3-sink

Running connectors
confluent status connectors>
[
  "file-source"
]

Unload / Stop ?

confluent unload <connector-name>

Documentation / Reference





Discover More
Log Consumer
Kafka - (Consumer) Offset

The offset is the position of a consumer in a topic keyrecord Zookeeperconsumer groupStream Processing ...
Converter Basics
Kafka - Connect

Kafka Connect is a framework service based around connector to move data into and out of Kafka. The service is a worker. By starting a worker, you are starting connect. The management is done through...
Kafka Commit Log Messaging Process
Kafka - Tutorials

A list of tutorials that I made on windows. They were my path to learn Kafka confluent In this tutorial: you start all services in a docker container you write and read data into a topic...
Converter Basics
Kafka Connect - Connector Plugin

Connector is a component of the connect framework that coordinates data streaming by managing tasks A connector instance is a logical job. Each connector instance coordinates a set of tasks that actually...
Kafka Commit Log Messaging Process
Kafka Connect - File Source connector

Reading File with connect. Adapted from Quickstart kafka connect distributed mode Start the demo docker image...
Converter Basics
Kafka Connect - REST API

The Connect Rest api is the management interface for the connect service. Unlike many other systems, all nodes in Kafka Connect can respond to REST requests, including creating, listing, modifying, and...
Kafka Commit Log Messaging Process
Kafka Connect - Sqlite in Distributed Mode

Sqlite JDBC source connector demo. The same steps than in the article but with a distributed worker Install Docker: Install Git: If you want to make the call with the kafka console utilities...
Kafka Commit Log Messaging Process
Kafka Connect - Sqlite in Standalone Mode

Windows where: connection.url is the JDBC URL mode indicates how we want to query the data. incrementing means that each query for new data will only return rows with IDs larger than the maximum...
Converter Basics
Kafka Connect - Standalone Mode

The runtime standalone mode of connect when running/starting a worker Standalone mode is best suited for: testing, one-off jobs or single agent (such as sending logs from webservers to Kafka)...
Converter Basics
Kafka Connect - Storage Topics

The distributed workers are stateless. The data are stored within internal Kafka topics: connector configurations task configurations, offsets, and status Connect creates this topics...



Share this page:
Follow us:
Task Runner