The Connect Rest api is the management interface for the connect service.
Unlike many other systems, all nodes in Kafka Connect can respond to REST requests, including creating, listing, modifying, and destroying connectors
When executed in distributed mode, the REST API is the primary interface to the cluster. You can make requests to any cluster member.
./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties
curl -s localhost:8083/ | jq .
where:
{
"version": "0.11.0.0-cp1",
"commit": "5cadaa94d0a69e0d"
}
By default this service runs on port 8083.
Listing active connectors on a worker
curl -X GET http://localhost:8083/connectors
where:
List the connector plugins available on a worker
curl -s localhost:8083/connector-plugins | jq .
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "3.3.0"
},
{
"class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"type": "sink",
"version": "3.3.0"
},
{
"class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
"type": "source",
"version": "0.11.0.0-cp1"
},
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "3.3.0"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "3.3.0"
},
{
"class": "io.confluent.connect.s3.S3SinkConnector",
"type": "sink",
"version": "3.3.0"
},
{
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "0.11.0.0-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "0.11.0.0-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "0.11.0.0-cp1"
}
]
Restarting a connector
curl -X POST localhost:8083/connectors/connectorName/restart
(no response printed if success)
Pausing a connector
curl -X PUT localhost:8083/connectors/connectorName/pause
(no response printed if success)
Delete a connector
curl -X DELETE localhost:8083/connectors/connectorName
(no response printed if success)
Resuming a connector
curl -X PUT localhost:8083/connectors/connectorName/resume
(no response printed if success)
Get the status of a connector
curl -s localhost:8083/connectors/connectorName/status | jq .
{
"name": "local-file-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.86.101:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "192.168.86.101:8083"
},
{
"state": "RUNNING",
"id": 1,
"worker_id": "192.168.86.101:8083"
}
]
}
Get the connector configuration
curl localhost:8083/connectors/connectorName | jq
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSinkConnector",
"file": "test.sink.txt",
"tasks.max": "2",
"topics": "connect-test",
"name": "local-file-sink"
},
"tasks": [
{
"connector": "local-file-sink",
"task": 0
},
{
"connector": "local-file-sink",
"task": 1
}
]
}
Getting tasks for a connector
curl -s localhost:8083/connectors/connectorName/tasks | jq .
[
{
"id": {
"connector": "local-file-sink",
"task": 0
},
"config": {
"task.class": "org.apache.kafka.connect.file.FileStreamSinkTask",
"topics": "connect-test",
"file": "test.sink.txt"
}
}
]
Restarting a task
curl -X POST localhost:8083/connectors/connectorName/tasks/0/restart
(no response printed if success)