Kafka Connect - Sqlite in Distributed Mode

1 - About

Sqlite JDBC source connector demo.

The same steps than in the Kafka Connect - Sqlite in Standalone Mode article but with a distributed worker

3 - Prerequisites

  • Install Docker: Docker - Installation
  • If you want to make the call with the kafka console utilities from your machine and not from the docker container, you need to add a mapping from each service to the docker host in your host file. This is needed because of the ADVERTISED_LISTENERS properties. For instance, where 192.168.99.100 is the docker host ip

192.168.99.100   broker
192.168.99.100   connect
192.168.99.100   zookeeper
192.168.99.100   schema_registry

4 - Steps

4.1 - Docker


git clone https://github.com/gerardnico/kafka.git 

  • Go to the sqlite connect directory and start the docker compose project

cd kafka/connect/sqlite
docker-compose up -d # -d for daemon or detached mode


Creating network "connect_default" with the default driver
Pulling broker (confluentinc/cp-enterprise-kafka:4.0.0)...
4.0.0: Pulling from confluentinc/cp-enterprise-kafka
aa18ad1a0d33: Already exists
249e19ca656d: Already exists
fc89c0416c76: Already exists
..................
..................
Creating connect_zookeeper_1 ...
Creating connect_zookeeper_1 ... done
Creating connect_broker_1 ...
Creating connect_broker_1 ... done
Creating connect_schema_registry_1 ...
Creating connect_schema_registry_1 ... done
Creating connect_connect_1 ...
Creating connect_connect_1 ... done
Creating connect_control-center_1 ...
Creating connect_control-center_1 ... done

  • Status

docker-compose ps


Name                       Command            State                     Ports
----------------------------------------------------------------------------------------------------------
connect_broker_1            /etc/confluent/docker/run   Up      0.0.0.0:9092->9092/tcp
connect_connect_1           /etc/confluent/docker/run   Up      0.0.0.0:8083->8083/tcp, 9092/tcp
connect_schema_registry_1   /etc/confluent/docker/run   Up      0.0.0.0:8081->8081/tcp
connect_zookeeper_1         /etc/confluent/docker/run   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp

4.2 - Sqlite and Jq

Sqlite and the jq utilities are already installed.

For information, to install them from a raw debian operating system, the steps below are needed.


# Update to get the repo file
apt-get update
# Install sqlite and jq
apt-get install sqlite3
apt-get install jq # An utility often used to parse json

4.3 - Kafka Services Working Directory

The working directory of the process will be the directory where the services were started. We will use it also to create the sqlite database. In this way, we don't need to change any kafka configuration file.

You can check it with the following command:


docker-compose exec connect bash
# Search the PID of the connect service pgrep
# and show the working directory
$ pwdx $(pgrep -f connect)


# It was started on the root and the process id is the first one !
1: /

4.4 - Creating a database


cd /
sqlite3 test.db

  • create a table

CREATE TABLE accounts(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255));
INSERT INTO accounts(name) VALUES('alice');
INSERT INTO accounts(name) VALUES('bob');
.quit

4.5 - A look at the Configuration file

The connector configuration file is located at:

  • /etc/kafka-connect-jdbc/source-quickstart-sqlite.json

{
	"name": "test-source-sqlite-jdbc-autoincrement",
	"config": {
		"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
		"tasks.max": "1",
		"topic.prefix": "test-sqlite-jdbc-",
		"connection.url" : "jdbc:sqlite:test.db",
		"mode" : "incrementing",
		"incrementing.column.name": "id"
	}
}

where:

  • connection.url is the JDBC URL
  • mode indicates how we want to query the data. incrementing means that each query for new data will only return rows with IDs larger than the maximum ID seen in previous queries for the column incrementing.column.name.
  • topic.prefix control the names of the topics. One topic by table: The only output topic in this example will be test-sqlite-jdbc-accounts.

4.6 - Create the connector

Create the connector in connect distributed.

If you got an error, check the logs:

docker-compose logs connect


docker-compose exec connect bash
cd /etc/kafka-connect-jdbc/
curl -X POST -H "Content-Type: application/json" --data '@source-quickstart-sqlite.json' http://localhost:8083/connectors | jq .

# You can also perform it outside of the container by replacing localhost by the ip of you docker machine 
# curl -X POST -H "Content-Type: application/json" --data '@source-quickstart-sqlite.json' http://docker-machine:8083/connectors

When successful, the call returns the connector configuration file:


{
  "name": "test-source-sqlite-jdbc-autoincrement",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "topic.prefix": "test-sqlite-jdbc-",
    "connection.url": "jdbc:sqlite:test.db",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "name": "test-source-sqlite-jdbc-autoincrement"
  },
  "tasks": [
    {
      "connector": "test-source-sqlite-jdbc-autoincrement",
      "task": 0
    }
  ],
  "type": null
}

4.7 - Query Kafka Information (Connectors, Topics)


curl -s connect:8083/connectors


["test-source-sqlite-jdbc-autoincrement"]

  • List the topic created by the connector by grepping the topic.prefix property value (ie test-sqlite-jdbc)

kafka-topics --list --zookeeper zookeeper:2181 | grep test-sqlite-jdbc


test-sqlite-jdbc-accounts

4.8 - Consume

With the kafka-avro-console-consumer built-in consumer


docker-compose exec connect bash
kafka-avro-console-consumer --bootstrap-server broker:9092 --topic test-sqlite-jdbc-accounts --from-beginning --property="schema.registry.url=http://schema_registry:8081"


[2017-12-12 18:10:41,346] INFO Kafka version : 1.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-12-12 18:10:41,347] INFO Kafka commitId : ec61c5e93da662df (org.apache.kafka.common.utils.AppInfoParser)
[2017-12-12 18:10:41,596] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Discovered coordinator broker:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2017-12-12 18:10:41,600] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-12-12 18:10:41,601] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2017-12-12 18:10:41,615] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2017-12-12 18:10:41,617] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Setting newly assigned partitions [test-sqlite-jdbc-accounts-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

{"id":1,"name":{"string":"alice"}}
{"id":2,"name":{"string":"bob"}}

  • two records one per line, in the JSON encoding of the Avro records.
  • Each row is represented as an Avro record and each column is a field in the record.
  • The JSON encoding of Avro encodes the strings in the format {“type”: value}, so we can see that both rows have string values with the names specified when we inserted the data.

4.9 - Produce and see the consumer reacts

  • Open a new terminal

docker-compose exec connect bash

  • Insert a new record

sqlite3 /test.db
INSERT INTO accounts(name) VALUES('nico');
.quit

  • In the consumer window, we will see a new record

{"id":3,"name":{"string":"nico"}}

5 - Support

5.1 - Error deserializing Avro message for id 1

You may got the following message when you are trying to consume messages with a kafka-console-consumer.


[2017-12-12 21:20:47,992] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:107)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.net.ConnectException: Connection refused
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at java.net.Socket.connect(Socket.java:538)
        at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
        at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
        at sun.net.www.http.HttpClient.New(HttpClient.java:308)
        at sun.net.www.http.HttpClient.New(HttpClient.java:326)
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999)
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:174)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:148)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:140)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:146)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

This problem is caused by the fact that the broker and the schema_registry services runs on different host. By default, a kafka-console-consumer expects to see the schema_registry on the localhost. To change that you need to add the schema.registry.url property.

Example:


kafka-avro-console-consumer --bootstrap-server broker:9092 --topic test-sqlite-jdbc-accounts --from-beginning --property="schema.registry.url=http://schema_registry:8081"

6 - Documentation / Reference


Data Science
Data Analysis
Statistics
Data Science
Linear Algebra Mathematics
Trigonometry

Powered by ComboStrap