Sqlite JDBC source connector demo.
The same steps than in the Sqlite Standalone article but with a distributed worker
192.168.99.100 broker
192.168.99.100 connect
192.168.99.100 zookeeper
192.168.99.100 schema_registry
git clone https://github.com/gerardnico/kafka.git
cd kafka/connect/sqlite
docker-compose up -d # -d for daemon or detached mode
Creating network "connect_default" with the default driver
Pulling broker (confluentinc/cp-enterprise-kafka:4.0.0)...
4.0.0: Pulling from confluentinc/cp-enterprise-kafka
aa18ad1a0d33: Already exists
249e19ca656d: Already exists
fc89c0416c76: Already exists
..................
..................
Creating connect_zookeeper_1 ...
Creating connect_zookeeper_1 ... done
Creating connect_broker_1 ...
Creating connect_broker_1 ... done
Creating connect_schema_registry_1 ...
Creating connect_schema_registry_1 ... done
Creating connect_connect_1 ...
Creating connect_connect_1 ... done
Creating connect_control-center_1 ...
Creating connect_control-center_1 ... done
docker-compose ps
Name Command State Ports
----------------------------------------------------------------------------------------------------------
connect_broker_1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
connect_connect_1 /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp
connect_schema_registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
connect_zookeeper_1 /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Sqlite and the jq utilities are already installed.
For information, to install them from a raw debian operating system, the steps below are needed.
# Update to get the repo file
apt-get update
# Install sqlite and jq
apt-get install sqlite3
apt-get install jq # An utility often used to parse json
The working directory of the process will be the directory where the services were started. We will use it also to create the sqlite database. In this way, we don't need to change any kafka configuration file.
You can check it with the following command:
docker-compose exec connect bash
# Search the PID of the connect service pgrep
# and show the working directory
$ pwdx $(pgrep -f connect)
# It was started on the root and the process id is the first one !
1: /
cd /
sqlite3 test.db
CREATE TABLE accounts(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255));
INSERT INTO accounts(name) VALUES('alice');
INSERT INTO accounts(name) VALUES('bob');
.quit
The connector configuration file is located at:
{
"name": "test-source-sqlite-jdbc-autoincrement",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"topic.prefix": "test-sqlite-jdbc-",
"connection.url" : "jdbc:sqlite:test.db",
"mode" : "incrementing",
"incrementing.column.name": "id"
}
}
where:
Create the connector in connect distributed.
If you got an error, check the logs:
docker-compose logs connect
docker-compose exec connect bash
cd /etc/kafka-connect-jdbc/
curl -X POST -H "Content-Type: application/json" --data '@source-quickstart-sqlite.json' http://localhost:8083/connectors | jq .
# You can also perform it outside of the container by replacing localhost by the ip of you docker machine
# curl -X POST -H "Content-Type: application/json" --data '@source-quickstart-sqlite.json' http://docker-machine:8083/connectors
When successful, the call returns the connector configuration file:
{
"name": "test-source-sqlite-jdbc-autoincrement",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"topic.prefix": "test-sqlite-jdbc-",
"connection.url": "jdbc:sqlite:test.db",
"mode": "incrementing",
"incrementing.column.name": "id",
"name": "test-source-sqlite-jdbc-autoincrement"
},
"tasks": [
{
"connector": "test-source-sqlite-jdbc-autoincrement",
"task": 0
}
],
"type": null
}
curl -s connect:8083/connectors
["test-source-sqlite-jdbc-autoincrement"]
kafka-topics --list --zookeeper zookeeper:2181 | grep test-sqlite-jdbc
test-sqlite-jdbc-accounts
With the kafka-avro-console-consumer built-in consumer
docker-compose exec connect bash
kafka-avro-console-consumer --bootstrap-server broker:9092 --topic test-sqlite-jdbc-accounts --from-beginning --property="schema.registry.url=http://schema_registry:8081"
[2017-12-12 18:10:41,346] INFO Kafka version : 1.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-12-12 18:10:41,347] INFO Kafka commitId : ec61c5e93da662df (org.apache.kafka.common.utils.AppInfoParser)
[2017-12-12 18:10:41,596] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Discovered coordinator broker:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2017-12-12 18:10:41,600] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-12-12 18:10:41,601] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2017-12-12 18:10:41,615] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2017-12-12 18:10:41,617] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Setting newly assigned partitions [test-sqlite-jdbc-accounts-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
{"id":1,"name":{"string":"alice"}}
{"id":2,"name":{"string":"bob"}}
docker-compose exec connect bash
sqlite3 /test.db
INSERT INTO accounts(name) VALUES('nico');
.quit
{"id":3,"name":{"string":"nico"}}
You may got the following message when you are trying to consume messages with a kafka-console-consumer.
[2017-12-12 21:20:47,992] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:107)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:174)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:148)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:140)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:146)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
This problem is caused by the fact that the broker and the schema_registry services runs on different host. By default, a kafka-console-consumer expects to see the schema_registry on the localhost. To change that you need to add the schema.registry.url property.
Example:
kafka-avro-console-consumer --bootstrap-server broker:9092 --topic test-sqlite-jdbc-accounts --from-beginning --property="schema.registry.url=http://schema_registry:8081"