Kafka Connect - Sqlite in Standalone Mode

About

Sqlite JDBC source connector demo.

A modified version of the JDBC Source Connector Quickstart that use a docker image.

The same steps than in the distributed article but with a standalone worker

Steps

Docker

Start the demo base image.

docker run   -ti `# To get the console of bash`   \
  --rm `# Remove the container`   
  --name sqlite-demo
  -p 8083:8083 `# Map the connect rest port to the docker machine`  \
  -p 8081:8081 `# Map the service registry port to the docker machine`
  -p 2181:2181 `# Map the zookeeper port to the docker machine`
  -p 9092:9092 `# Map the Bootstrap server port to the docker machine`
  confluentinc/docker-demo-base:3.3.0 `# The image`  \
  //bin//bash # The command to run - Double Slash because I run it in Cmder

Install Sqlite

Install sqlite: SQLite - Getting Started

apt-get update
# apt-get install unzip
apt-get install sqlite3
apt-get install jq # An utility often used to parse json

Start confluent

cd /tmp
confluent start
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]

Kafka Services Working Directory

The working directory of the process will be the directory where we have started the services. Ie /tmp We will use it also to create the sqlite database. In this way, we don't need to change any kafka configuration file.

You can check it with the following command:

# Search the PID of the connect service pgrep
# and show the working directory
$ pwdx $(pgrep -f connect)
1473: /tmp

Creating a database

sqlite3 test.db
  • create a table
CREATE TABLE accounts(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255));
INSERT INTO accounts(name) VALUES('alice');
INSERT INTO accounts(name) VALUES('bob');
.quit

A look at the Configuration file

The connector configuration file is located at:

  • Windows <confluent-home>/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
  • Linux /etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:test.db
mode=incrementing
incrementing.column.name=id
topic.prefix=test-sqlite-jdbc-

where:

  • connection.url is the JDBC URL
  • mode indicates how we want to query the data. incrementing means that each query for new data will only return rows with IDs larger than the maximum ID seen in previous queries for the column incrementing.column.name.
  • topic.prefix control the names of the topics. One topic by table: The only output topic in this example will be test-sqlite-jdbc-accounts.

Start the worker in a standalone mode

  • Stop the actual connect service as a distributed mode. Just because we don't want to modify the worker configuration file to choose an other rest port (8083). By stopping connect, we free the port 8083.
confluent stop connect
  • Start a worker in standalone mode and as a daemon.
connect-standalone -daemon /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
  • Check the log
cat /logs/connectStandalone.out | grep -i "finished"
[2017-10-18 12:43:26,306] INFO Finished creating connector test-source-sqlite-jdbc-autoincrement (org.apache.kafka.connect.runtime.Worker:225)
[2017-10-18 12:43:26,425] INFO Source task WorkerSourceTask{id=test-source-sqlite-jdbc-autoincrement-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)

Query Kafka Information (Connectors, Topics)

curl -s localhost:8083/connectors
["test-source-sqlite-jdbc-autoincrement"]
  • List the topic created by the connector by grepping the topic.prefix property value (ie test-sqlite-jdbc)
kafka-topics --list --zookeeper localhost:2181 | grep test-sqlite-jdbc
test-sqlite-jdbc-accounts

Consume

kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-sqlite-jdbc-accounts --from-beginning
{"id":1,"name":{"string":"alice"}}
{"id":2,"name":{"string":"bob"}}
  • two records one per line, in the JSON encoding of the Avro records.
  • Each row is represented as an Avro record and each column is a field in the record.
  • The JSON encoding of Avro encodes the strings in the format {“type”: value}, so we can see that both rows have string values with the names specified when we inserted the data.

Produce

  • Open a new terminal
docker exec -it sqlite-demo //bin//bash
  • Insert a new record
cd /tmp
sqlite3 test.db
INSERT INTO accounts(name) VALUES('nico');
.quit
  • In the consumer window, we will see a new record
{"id":3,"name":{"string":"nico"}}

Schema Evolution

schema-evolution example

The JDBC connector supports schema evolution when the Avro converter is used.

Whether we can successfully register the schema or not depends on the compatibility level of the Schema Registry, which is backward by default. For example:

  • if we remove a column from a table, the change is backward compatible and the corresponding Avro schema can be successfully registered in the Schema Registry.
  • If we modify the database table schema to change a column type, when the Avro schema is registered to the Schema Registry, it will be rejected as the changes are not backward compatible.

Add a column

cd /tmp
sqlite3 test.db
alter table accounts add age number;
INSERT INTO accounts(name) VALUES('nico2',30);
.quit
  • In the consumer window, we will see a new record
{"id":3,"name":{"string":"nico2"},"age":{"long":30}}

Get schema information

  • List the schema (subject)
curl -X GET http://localhost:8081/subjects
["test-sqlite-jdbc-accounts-value"]
  • List all schema versions registered under the subject test-sqlite-jdbc-accounts-value
curl -X GET http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions
[1,2]
  • Fetch version 1 of the schema registered under the subject test-sqlite-jdbc-accounts-value
curl -s -X GET http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions/1 | jq .
{                                                                                                                                                                                                          
  "subject": "test-sqlite-jdbc-accounts-value",                                                                                                                                                            
  "version": 1,                                                                                                                                                                                            
  "id": 1,                                                                                                                                                                                                 
  "schema": "{\"type\":\"record\",\"name\":\"accounts\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"accounts\"}"                                                                                                                                                                                                          
}
  • Fetch version 2 of the schema registered under the subject test-sqlite-jdbc-accounts-value
curl -s -X GET http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions/2 | jq .
{
  "subject": "test-sqlite-jdbc-accounts-value",
  "version": 2,
  "id": 2,
  "schema": "{\"type\":\"record\",\"name\":\"accounts\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"accounts\"}"
}

Documentation / Reference


Powered by ComboStrap