* Linux ''/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties''
A simple example that copies all tables from a SQLite database. The first few settings arerequired for all connectors: a name, the connector class to run, and the maximum number oftasks to create:name=test-sqlite-jdbc-autoincrementconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=1The remaining configs are specific to the JDBC source connector. In this example, we connect to aSQLite database stored in the file test.db, use and auto-incrementing column called 'id' todetect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.connection.url=jdbc:sqlite:test.dbmode=incrementingincrementing.column.name=idtopic.prefix=test-sqlite-jdbc-
where:
* connection.url is the JDBC URL
* mode indicates how we want to query the data. incrementing means that each query for new data will only return rows with IDs larger than the maximum ID seen in previous queries for the column incrementing.column.name.
* topic.prefix control the names of the topics. One topic by table: The only output topic in this example will be test-sqlite-jdbc-accounts.
==== Start the worker in a standalone mode ====
* Stop the actual connect service as a distributed mode. Just because we don't want to modify the worker configuration file to choose an other rest port (8083). By stopping connect, we free the port 8083.
confluent stop connect
* Start a worker in standalone mode and as a daemon.
connect-standalone -daemon /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
* Check the log
cat /logs/connectStandalone.out | grep -i “finished”
[2017-10-18 12:43:26,306] INFO Finished creating connector test-source-sqlite-jdbc-autoincrement (org.apache.kafka.connect.runtime.Worker:225)[2017-10-18 12:43:26,425] INFO Source task WorkerSourceTask{id=test-source-sqlite-jdbc-autoincrement-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)
==== Query Kafka Information (Connectors, Topics) ====
* Check that the connector is listed with the connector rest api
curl -s localhost:8083/connectors
[“test-source-sqlite-jdbc-autoincrement”]
* List the topic created by the connector by grepping the topic.prefix property value (ie test-sqlite-jdbc)
kafka-topics –list –zookeeper localhost:2181 | grep test-sqlite-jdbc
test-sqlite-jdbc-accounts
==== Consume ====
kafka-avro-console-consumer –new-consumer –bootstrap-server localhost:9092 –topic test-sqlite-jdbc-accounts –from-beginning
{“id”:1,“name”:{“string”:“alice”}}{“id”:2,“name”:{“string”:“bob”}}
* two records one per line, in the JSON encoding of the Avro records.
* Each row is represented as an Avro record and each column is a field in the record.
* The JSON encoding of Avro encodes the strings in the format {“type”: value}, so we can see that both rows have string values with the names specified when we inserted the data.
==== Produce ====
* Open a new terminal
docker exec -it sqlite-demo binbash
* Insert a new record
cd /tmpsqlite3 test.dbINSERT INTO accounts(name) VALUES('nico');.quit
* In the consumer window, we will see a new record
{“id”:3,“name”:{“string”:“nico”}}
==== Schema Evolution ====
schema-evolution example
The JDBC connector supports schema evolution when the Avro converter is used.
Whether we can successfully register the schema or not depends on the compatibility level of the Schema Registry, which is backward by default. For example:
* if we remove a column from a table, the change is backward compatible and the corresponding Avro schema can be successfully registered in the Schema Registry.
* If we modify the database table schema to change a column type, when the Avro schema is registered to the Schema Registry, it will be rejected as the changes are not backward compatible.
=== Add a column ===
cd /tmpsqlite3 test.dbalter table accounts add age number;INSERT INTO accounts(name) VALUES('nico2',30);.quit
* In the consumer window, we will see a new record
{“id”:3,“name”:{“string”:“nico2”},“age”:{“long”:30}}
=== Get schema information ===
* List the schema (subject)
curl -X GET http://localhost:8081/subjects
[“test-sqlite-jdbc-accounts-value”]
* List all schema versions registered under the subject test-sqlite-jdbc-accounts-value
curl -X GET http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions
[1,2]
* Fetch version 1 of the schema registered under the subject test-sqlite-jdbc-accounts-value
curl -s -X GET http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions/1 | jq .
{ “subject”: “test-sqlite-jdbc-accounts-value”, “version”: 1, “id”: 1, “schema”: “{\”type\“:\”record\“,\”name\“:\”accounts\“,\”fields\“:[{\”name\“:\”id\“,\”type\“:\”long\“},{\”name\“:\”name\“,\”type\“:[\”null\“,\”string\“],\”default\“:null}],\”connect.name\“:\”accounts\“}”}
* Fetch version 2 of the schema registered under the subject test-sqlite-jdbc-accounts-value
curl -s -X GET http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions/2 | jq .
{“subject”: “test-sqlite-jdbc-accounts-value”,“version”: 2,“id”: 2,“schema”: “{\”type\“:\”record\“,\”name\“:\”accounts\“,\”fields\“:[{\”name\“:\”id\“,\”type\“:\”long\“},{\”name\“:\”name\“,\”type\“:[\”null\“,\”string\“],\”default\“:null},{\”name\“:\”age\“,\”type\“:[\”null\“,\”long\“],\”default\“:null}],\”connect.name\“:\”accounts\“}”}
===== Documentation / Reference =====
* https://docs.confluent.io/current/connect/connect-jdbc/docs/source_connector.html#quickstart