Kafka Connect - REST API

About

The Connect Rest api is the management interface for the connect service.

Unlike many other systems, all nodes in Kafka Connect can respond to REST requests, including creating, listing, modifying, and destroying connectors

When executed in distributed mode, the REST API is the primary interface to the cluster. You can make requests to any cluster member.

Management

Service

Start

./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties

Version

 curl -s localhost:8083/ | jq .

where:

  • -s will suppress the download bar
  • . will apply no filter (See 1.4) but will pretty print Json
{
  "version": "0.11.0.0-cp1",
  "commit": "5cadaa94d0a69e0d"
}

Port

By default this service runs on port 8083.

Connectors

List

Listing active connectors on a worker

curl -X GET http://localhost:8083/connectors

where:

  • hostname is where the rest service is running (the docker machine IP for instance)

List Connector Plugins

List the connector plugins available on a worker

curl -s localhost:8083/connector-plugins | jq .
[                                                                            
  {                                                                          
    "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type": "sink",                                                          
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.hdfs.HdfsSinkConnector",                  
    "type": "sink",                                                          
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",        
    "type": "source",                                                        
    "version": "0.11.0.0-cp1"                                                
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",                  
    "type": "sink",                                                          
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",                
    "type": "source",                                                        
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.s3.S3SinkConnector",                      
    "type": "sink",                                                          
    "version": "3.3.0"                                                       
  },                                                                         
  {                                                                          
    "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",     
    "type": "source",                                                        
    "version": "0.11.0.0-cp1"                                                
  },                                                                         
  {                                                                          
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",        
    "type": "sink",                                                          
    "version": "0.11.0.0-cp1"                                                
  },                                                                         
  {                                                                          
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",      
    "type": "source",                                                        
    "version": "0.11.0.0-cp1"                                                
  }                                                                          
]

Restart

Restarting a connector

curl -X POST localhost:8083/connectors/connectorName/restart
(no response printed if success)

Pausing

Pausing a connector

curl -X PUT localhost:8083/connectors/connectorName/pause
(no response printed if success)

Delete a connetor

Delete a connector

curl -X DELETE localhost:8083/connectors/connectorName
(no response printed if success)

Resuming

Resuming a connector

curl -X PUT localhost:8083/connectors/connectorName/resume
(no response printed if success)

Status

Get the status of a connector

curl -s localhost:8083/connectors/connectorName/status | jq .
{
  "name": "local-file-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.86.101:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "192.168.86.101:8083"
    },
    {
      "state": "RUNNING",
      "id": 1,
      "worker_id": "192.168.86.101:8083"
    }
  ]
}

Get Connector configuration

Get the connector configuration

curl localhost:8083/connectors/connectorName | jq
{
  "name": "local-file-sink",
  "config": {
    "connector.class": "FileStreamSinkConnector",
    "file": "test.sink.txt",
    "tasks.max": "2",
    "topics": "connect-test",
    "name": "local-file-sink"
  },
  "tasks": [
    {
      "connector": "local-file-sink",
      "task": 0
    },
    {
      "connector": "local-file-sink",
      "task": 1
    }
  ]
}

Task

List

Getting tasks for a connector

curl -s localhost:8083/connectors/connectorName/tasks | jq .
[
  {
    "id": {
      "connector": "local-file-sink",
      "task": 0
    },
    "config": {
      "task.class": "org.apache.kafka.connect.file.FileStreamSinkTask",
      "topics": "connect-test",
      "file": "test.sink.txt"
    }
  }
]

Restarting

Restarting a task

curl -X POST localhost:8083/connectors/connectorName/tasks/0/restart
(no response printed if success)

Utility

Documentation / Reference


Powered by ComboStrap