detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.connection.url=jdbc:sqlite:test.dbmode=incrementingincrementing.column.name=idtopic.prefix=test-sqlite-jdbc-
where:
*
connection.url is the JDBC
URL
*
mode indicates how we want to query the data.
incrementing means that each query for new data will only return rows with IDs larger than the maximum ID seen in previous queries for the column
incrementing.column.name.
*
topic.prefix control the names of the
topics. One topic by table: The only output topic in this example will be
test-sqlite-jdbc-accounts.
==== Start the worker in a standalone mode ====
* Stop the actual
connect service as a
distributed mode. Just because we don't want to modify the
worker configuration file to choose an other rest port (8083). By stopping connect, we free the port 8083.
confluent stop connect
* Start a worker in
standalone mode and as a daemon.
connect-standalone -daemon /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
* Check the log
cat /logs/connectStandalone.out | grep -i “finished”
[2017-10-18 12:43:26,306] INFO Finished creating connector test-source-sqlite-jdbc-autoincrement (org.apache.kafka.connect.runtime.Worker:225)[2017-10-18 12:43:26,425] INFO Source task WorkerSourceTask{id=test-source-sqlite-jdbc-autoincrement-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)
==== Query Kafka Information (Connectors, Topics) ====
* Check that the connector is listed with the
connector rest api
curl -s localhost:8083/connectors
[“test-source-sqlite-jdbc-autoincrement”]
* List the
topic created by the connector by grepping the
topic.prefix property value (ie
test-sqlite-jdbc)
kafka-topics –list –zookeeper localhost:2181 | grep test-sqlite-jdbc
test-sqlite-jdbc-accounts
==== Consume ====
kafka-avro-console-consumer –new-consumer –bootstrap-server localhost:9092 –topic test-sqlite-jdbc-accounts –from-beginning
{“id”:1,“name”:{“string”:“alice”}}{“id”:2,“name”:{“string”:“bob”}}
* two records one per line, in the JSON encoding of the Avro records.
* Each row is represented as an Avro record and each column is a field in the record.
* The
JSON encoding of Avro encodes the
strings in the format
{“type”: value}, so we can see that both rows have string values with the names specified when we inserted the data.
==== Produce ====
* Open a new terminal
docker exec -it sqlite-demo
binbash
* Insert a new record
cd /tmpsqlite3 test.dbINSERT INTO accounts(name) VALUES('nico');.quit
* In the consumer window, we will see a new record
{“id”:3,“name”:{“string”:“nico”}}
==== Schema Evolution ====
schema-evolution example
The JDBC connector supports schema evolution when the Avro converter is used.
Whether we can successfully register the schema or not depends on the
compatibility level of the Schema Registry, which is backward by default. For example:
* if we remove a column from a table, the change is backward compatible and the corresponding Avro schema can be successfully registered in the Schema Registry.
* If we modify the database table schema to change a column type, when the Avro schema is registered to the Schema Registry, it will be rejected as the changes are not backward compatible.
=== Add a column ===
cd /tmpsqlite3 test.dbalter table accounts add age number;INSERT INTO accounts(name) VALUES('nico2',30);.quit
* In the consumer window, we will see a new record
{“id”:3,“name”:{“string”:“nico2”},“age”:{“long”:30}}
=== Get schema information ===
* List the schema (subject)
curl -X GET
http://localhost:8081/subjects
[“test-sqlite-jdbc-accounts-value”]
* List all schema versions registered under the subject
test-sqlite-jdbc-accounts-value
curl -X GET
http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions
[1,2]
* Fetch version 1 of the schema registered under the subject
test-sqlite-jdbc-accounts-value
curl -s -X GET
http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions/1 | jq .
{ “subject”: “test-sqlite-jdbc-accounts-value”, “version”: 1, “id”: 1, “schema”: “{\”type\“:\”record\“,\”name\“:\”accounts\“,\”fields\“:[{\”name\“:\”id\“,\”type\“:\”long\“},{\”name\“:\”name\“,\”type\“:[\”null\“,\”string\“],\”default\“:null}],\”connect.name\“:\”accounts\“}”}
* Fetch version 2 of the schema registered under the subject
test-sqlite-jdbc-accounts-value
curl -s -X GET
http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions/2 | jq .
{“subject”: “test-sqlite-jdbc-accounts-value”,“version”: 2,“id”: 2,“schema”: “{\”type\“:\”record\“,\”name\“:\”accounts\“,\”fields\“:[{\”name\“:\”id\“,\”type\“:\”long\“},{\”name\“:\”name\“,\”type\“:[\”null\“,\”string\“],\”default\“:null},{\”name\“:\”age\“,\”type\“:[\”null\“,\”long\“],\”default\“:null}],\”connect.name\“:\”accounts\“}”}
===== Documentation / Reference =====
*
https://docs.confluent.io/current/connect/connect-jdbc/docs/source_connector.html#quickstart