Kafka Connect - Sqlite in Distributed Mode

Kafka Commit Log Messaging Process

About

Sqlite JDBC source connector demo.

The same steps than in the Kafka Connect - Sqlite in Standalone Mode article but with a distributed worker

Prerequisites

192.168.99.100   broker
192.168.99.100   connect
192.168.99.100   zookeeper
192.168.99.100   schema_registry

Steps

Docker

git clone https://github.com/gerardnico/kafka.git 
  • Go to the sqlite connect directory and start the docker compose project
cd kafka/connect/sqlite
docker-compose up -d # -d for daemon or detached mode
Creating network "connect_default" with the default driver
Pulling broker (confluentinc/cp-enterprise-kafka:4.0.0)...
4.0.0: Pulling from confluentinc/cp-enterprise-kafka
aa18ad1a0d33: Already exists
249e19ca656d: Already exists
fc89c0416c76: Already exists
..................
..................
Creating connect_zookeeper_1 ...
Creating connect_zookeeper_1 ... done
Creating connect_broker_1 ...
Creating connect_broker_1 ... done
Creating connect_schema_registry_1 ...
Creating connect_schema_registry_1 ... done
Creating connect_connect_1 ...
Creating connect_connect_1 ... done
Creating connect_control-center_1 ...
Creating connect_control-center_1 ... done

  • Status
docker-compose ps
Name                       Command            State                     Ports
----------------------------------------------------------------------------------------------------------
connect_broker_1            /etc/confluent/docker/run   Up      0.0.0.0:9092->9092/tcp
connect_connect_1           /etc/confluent/docker/run   Up      0.0.0.0:8083->8083/tcp, 9092/tcp
connect_schema_registry_1   /etc/confluent/docker/run   Up      0.0.0.0:8081->8081/tcp
connect_zookeeper_1         /etc/confluent/docker/run   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp

Sqlite and Jq

Sqlite and the jq utilities are already installed.

For information, to install them from a raw debian operating system, the steps below are needed.

# Update to get the repo file
apt-get update
# Install sqlite and jq
apt-get install sqlite3
apt-get install jq # An utility often used to parse json

Kafka Services Working Directory

The working directory of the process will be the directory where the services were started. We will use it also to create the sqlite database. In this way, we don't need to change any kafka configuration file.

You can check it with the following command:

docker-compose exec connect bash
# Search the PID of the connect service pgrep
# and show the working directory
$ pwdx $(pgrep -f connect)
# It was started on the root and the process id is the first one !
1: /

Creating a database

cd /
sqlite3 test.db
  • create a table
CREATE TABLE accounts(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255));
INSERT INTO accounts(name) VALUES('alice');
INSERT INTO accounts(name) VALUES('bob');
.quit

A look at the Configuration file

The connector configuration file is located at:

  • /etc/kafka-connect-jdbc/source-quickstart-sqlite.json
{
	"name": "test-source-sqlite-jdbc-autoincrement",
	"config": {
		"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
		"tasks.max": "1",
		"topic.prefix": "test-sqlite-jdbc-",
		"connection.url" : "jdbc:sqlite:test.db",
		"mode" : "incrementing",
		"incrementing.column.name": "id"
	}
}

where:

  • connection.url is the JDBC URL
  • mode indicates how we want to query the data. incrementing means that each query for new data will only return rows with IDs larger than the maximum ID seen in previous queries for the column incrementing.column.name.
  • topic.prefix control the names of the topics. One topic by table: The only output topic in this example will be test-sqlite-jdbc-accounts.

Create the connector

Create the connector in connect distributed.

If you got an error, check the logs:

docker-compose logs connect
docker-compose exec connect bash
cd /etc/kafka-connect-jdbc/
curl -X POST -H "Content-Type: application/json" --data '@source-quickstart-sqlite.json' http://localhost:8083/connectors | jq .

# You can also perform it outside of the container by replacing localhost by the ip of you docker machine 
# curl -X POST -H "Content-Type: application/json" --data '@source-quickstart-sqlite.json' http://docker-machine:8083/connectors

When successful, the call returns the connector configuration file:

{
  "name": "test-source-sqlite-jdbc-autoincrement",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "topic.prefix": "test-sqlite-jdbc-",
    "connection.url": "jdbc:sqlite:test.db",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "name": "test-source-sqlite-jdbc-autoincrement"
  },
  "tasks": [
    {
      "connector": "test-source-sqlite-jdbc-autoincrement",
      "task": 0
    }
  ],
  "type": null
}

Query Kafka Information (Connectors, Topics)

curl -s connect:8083/connectors
["test-source-sqlite-jdbc-autoincrement"]

  • List the topic created by the connector by grepping the topic.prefix property value (ie test-sqlite-jdbc)
kafka-topics --list --zookeeper zookeeper:2181 | grep test-sqlite-jdbc
test-sqlite-jdbc-accounts

Consume

With the kafka-avro-console-consumer built-in consumer

docker-compose exec connect bash
kafka-avro-console-consumer --bootstrap-server broker:9092 --topic test-sqlite-jdbc-accounts --from-beginning --property="schema.registry.url=http://schema_registry:8081"
[2017-12-12 18:10:41,346] INFO Kafka version : 1.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-12-12 18:10:41,347] INFO Kafka commitId : ec61c5e93da662df (org.apache.kafka.common.utils.AppInfoParser)
[2017-12-12 18:10:41,596] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Discovered coordinator broker:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2017-12-12 18:10:41,600] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-12-12 18:10:41,601] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2017-12-12 18:10:41,615] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2017-12-12 18:10:41,617] INFO [Consumer clientId=consumer-1, groupId=console-consumer-50573] Setting newly assigned partitions [test-sqlite-jdbc-accounts-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

{"id":1,"name":{"string":"alice"}}
{"id":2,"name":{"string":"bob"}}

  • two records one per line, in the JSON encoding of the Avro records.
  • Each row is represented as an Avro record and each column is a field in the record.
  • The JSON encoding of Avro encodes the strings in the format {“type”: value}, so we can see that both rows have string values with the names specified when we inserted the data.

Produce and see the consumer reacts

  • Open a new terminal
docker-compose exec connect bash
  • Insert a new record
sqlite3 /test.db
INSERT INTO accounts(name) VALUES('nico');
.quit
  • In the consumer window, we will see a new record
{"id":3,"name":{"string":"nico"}}

Support

Error deserializing Avro message for id 1

You may got the following message when you are trying to consume messages with a kafka-console-consumer.

[2017-12-12 21:20:47,992] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:107)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.net.ConnectException: Connection refused
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at java.net.Socket.connect(Socket.java:538)
        at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
        at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
        at sun.net.www.http.HttpClient.New(HttpClient.java:308)
        at sun.net.www.http.HttpClient.New(HttpClient.java:326)
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999)
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:174)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:148)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:140)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:146)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

This problem is caused by the fact that the broker and the schema_registry services runs on different host. By default, a kafka-console-consumer expects to see the schema_registry on the localhost. To change that you need to add the schema.registry.url property.

Example:

kafka-avro-console-consumer --bootstrap-server broker:9092 --topic test-sqlite-jdbc-accounts --from-beginning --property="schema.registry.url=http://schema_registry:8081"

Documentation / Reference





Discover More
Docker Memory Kafka
Kafka - Docker

Docker usage in Confluent. The docker host must: use virtualbox as provider have a minimal of 6000 as virtual memory In docker machine command line, it means: In virtualbox, you can confirm...
Kafka Commit Log Messaging Process
Kafka - Tutorials

A list of tutorials that I made on windows. They were my path to learn Kafka confluent In this tutorial: you start all services in a docker container you write and read data into a topic...
Kafka Commit Log Messaging Process
Kafka Connect - SQlite

in Kafka.
Kafka Commit Log Messaging Process
Kafka Connect - Sqlite in Standalone Mode

Windows where: connection.url is the JDBC URL mode indicates how we want to query the data. incrementing means that each query for new data will only return rows with IDs larger than the maximum...



Share this page:
Follow us:
Task Runner