Kafka Connect - REST API

Converter Basics

About

The Connect Rest api is the management interface for the connect service.

Unlike many other systems, all nodes in Kafka Connect can respond to REST requests, including creating, listing, modifying, and destroying connectors

When executed in distributed mode, the REST API is the primary interface to the cluster. You can make requests to any cluster member.

Management

Service

Start

./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties

Version

 curl -s localhost:8083/ | jq .

where:

  • -s will suppress the download bar
  • . will apply no filter (See 1.4) but will pretty print Json
{
  "version": "0.11.0.0-cp1",
  "commit": "5cadaa94d0a69e0d"
}

Port

By default this service runs on port 8083.

Connectors

List

Listing active connectors on a worker

curl -X GET http://localhost:8083/connectors

where:

  • hostname is where the rest service is running (the docker machine IP for instance)

List Connector Plugins

List the connector plugins available on a worker

curl -s localhost:8083/connector-plugins | jq .
[                                                                            
  {                                                                          
    "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type": "sink",                                                          
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.hdfs.HdfsSinkConnector",                  
    "type": "sink",                                                          
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",        
    "type": "source",                                                        
    "version": "0.11.0.0-cp1"                                                
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",                  
    "type": "sink",                                                          
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",                
    "type": "source",                                                        
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.s3.S3SinkConnector",                      
    "type": "sink",                                                          
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",     
    "type": "source",                                                        
    "version": "0.11.0.0-cp1"                                                
  },                                                                         
  {                                                                          
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",        
    "type": "sink",                                                          
    "version": "0.11.0.0-cp1"                                                
  },                                                                         
  {                                                                          
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",      
    "type": "source",                                                        
    "version": "0.11.0.0-cp1"                                                
  }                                                                          
]

Restart

Restarting a connector

curl -X POST localhost:8083/connectors/connectorName/restart
(no response printed if success)

Pausing

Pausing a connector

curl -X PUT localhost:8083/connectors/connectorName/pause
(no response printed if success)

Delete a connetor

Delete a connector

curl -X DELETE localhost:8083/connectors/connectorName
(no response printed if success)

Resuming

Resuming a connector

curl -X PUT localhost:8083/connectors/connectorName/resume
(no response printed if success)

Status

Get the status of a connector

curl -s localhost:8083/connectors/connectorName/status | jq .
{
  "name": "local-file-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.86.101:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "192.168.86.101:8083"
    },
    {
      "state": "RUNNING",
      "id": 1,
      "worker_id": "192.168.86.101:8083"
    }
  ]
}

Get Connector configuration

Get the connector configuration

curl localhost:8083/connectors/connectorName | jq
{
  "name": "local-file-sink",
  "config": {
    "connector.class": "FileStreamSinkConnector",
    "file": "test.sink.txt",
    "tasks.max": "2",
    "topics": "connect-test",
    "name": "local-file-sink"
  },
  "tasks": [
    {
      "connector": "local-file-sink",
      "task": 0
    },
    {
      "connector": "local-file-sink",
      "task": 1
    }
  ]
}

Task

List

Getting tasks for a connector

curl -s localhost:8083/connectors/connectorName/tasks | jq .
[
  {
    "id": {
      "connector": "local-file-sink",
      "task": 0
    },
    "config": {
      "task.class": "org.apache.kafka.connect.file.FileStreamSinkTask",
      "topics": "connect-test",
      "file": "test.sink.txt"
    }
  }
]

Restarting

Restarting a task

curl -X POST localhost:8083/connectors/connectorName/tasks/0/restart
(no response printed if success)

Utility

Documentation / Reference





Discover More
Kafka Commit Log Messaging Process
Kafka - Confluent Installation and services

Installation, starting Kafka With zip file With package, If you installed from deb or rpm packages, the contents are installed globally...
Converter Basics
Kafka - Connect

Kafka Connect is a framework service based around connector to move data into and out of Kafka. The service is a worker. By starting a worker, you are starting connect. The management is done through...
Kafka Commit Log Messaging Process
Kafka - Tutorials

A list of tutorials that I made on windows. They were my path to learn Kafka confluent In this tutorial: you start all services in a docker container you write and read data into a topic...
Converter Basics
Kafka Connect - Connector Plugin

Connector is a component of the connect framework that coordinates data streaming by managing tasks A connector instance is a logical job. Each connector instance coordinates a set of tasks that actually...
Worker Model Basics
Kafka Connect - Distributed Worker

The runtime distributed mode of connect when running/starting a worker See where: worker.properties is the configuration file New workers will either start a new group or join an existing...
Kafka Commit Log Messaging Process
Kafka Connect - Sqlite in Distributed Mode

Sqlite JDBC source connector demo. The same steps than in the article but with a distributed worker Install Docker: Install Git: If you want to make the call with the kafka console utilities...
Converter Basics
Kafka Connect - Task

Task is a connector implementation on how data is copied to or from Kafka These tasks have no state stored within them. Task state is stored in Kafka in special topics and managed by the associated...
Kafka Commit Log Messaging Process
Sqlite Standalone

Windows where: connection.url is the JDBC URL mode indicates how we want to query the data. incrementing means that each query for new data will only return rows with IDs larger than the maximum...



Share this page:
Follow us:
Task Runner