Kafka Connect - Sqlite in Standalone Mode

Kafka Commit Log Messaging Process

Windows /etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
    * Linux ''/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties''
A simple example that copies all tables from a SQLite database. The first few settings are
required for all connectors: a name, the connector class to run, and the maximum number of
tasks to create:name=test-sqlite-jdbc-autoincrementconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=1
The remaining configs are specific to the JDBC source connector. In this example, we connect to a
SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.connection.url=jdbc:sqlite:test.dbmode=incrementingincrementing.column.name=idtopic.prefix=test-sqlite-jdbc-
where: * connection.url is the JDBC URL * mode indicates how we want to query the data. incrementing means that each query for new data will only return rows with IDs larger than the maximum ID seen in previous queries for the column incrementing.column.name. * topic.prefix control the names of the topics. One topic by table: The only output topic in this example will be test-sqlite-jdbc-accounts. ==== Start the worker in a standalone mode ==== * Stop the actual connect service as a distributed mode. Just because we don't want to modify the worker configuration file to choose an other rest port (8083). By stopping connect, we free the port 8083. confluent stop connect * Start a worker in standalone mode and as a daemon. connect-standalone -daemon /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-sqlite.properties * Check the log cat /logs/connectStandalone.out | grep -i “finished”
[2017-10-18 12:43:26,306] INFO Finished creating connector test-source-sqlite-jdbc-autoincrement (org.apache.kafka.connect.runtime.Worker:225)[2017-10-18 12:43:26,425] INFO Source task WorkerSourceTask{id=test-source-sqlite-jdbc-autoincrement-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)
==== Query Kafka Information (Connectors, Topics) ==== * Check that the connector is listed with the connector rest api curl -s localhost:8083/connectors
* List the topic created by the connector by grepping the topic.prefix property value (ie test-sqlite-jdbc) kafka-topics –list –zookeeper localhost:2181 | grep test-sqlite-jdbc
==== Consume ==== kafka-avro-console-consumer –new-consumer –bootstrap-server localhost:9092 –topic test-sqlite-jdbc-accounts –from-beginning
* two records one per line, in the JSON encoding of the Avro records. * Each row is represented as an Avro record and each column is a field in the record. * The JSON encoding of Avro encodes the strings in the format {“type”: value}, so we can see that both rows have string values with the names specified when we inserted the data. ==== Produce ==== * Open a new terminal docker exec -it sqlite-demo binbash * Insert a new record cd /tmpsqlite3 test.dbINSERT INTO accounts(name) VALUES('nico');.quit * In the consumer window, we will see a new record
==== Schema Evolution ==== schema-evolution example The JDBC connector supports schema evolution when the Avro converter is used. Whether we can successfully register the schema or not depends on the compatibility level of the Schema Registry, which is backward by default. For example: * if we remove a column from a table, the change is backward compatible and the corresponding Avro schema can be successfully registered in the Schema Registry. * If we modify the database table schema to change a column type, when the Avro schema is registered to the Schema Registry, it will be rejected as the changes are not backward compatible. === Add a column === cd /tmpsqlite3 test.dbalter table accounts add age number;INSERT INTO accounts(name) VALUES('nico2',30);.quit * In the consumer window, we will see a new record
=== Get schema information === * List the schema (subject) curl -X GET http://localhost:8081/subjects
* List all schema versions registered under the subject test-sqlite-jdbc-accounts-value curl -X GET http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions
* Fetch version 1 of the schema registered under the subject test-sqlite-jdbc-accounts-value curl -s -X GET http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions/1 | jq .
{                                                                                                                                                                                                          “subject”: “test-sqlite-jdbc-accounts-value”,                                                                                                                                                            “version”: 1,                                                                                                                                                                                            “id”: 1,                                                                                                                                                                                                 “schema”: “{\”type\“:\”record\“,\”name\“:\”accounts\“,\”fields\“:[{\”name\“:\”id\“,\”type\“:\”long\“},{\”name\“:\”name\“,\”type\“:[\”null\“,\”string\“],\”default\“:null}],\”connect.name\“:\”accounts\“}”}
* Fetch version 2 of the schema registered under the subject test-sqlite-jdbc-accounts-value curl -s -X GET http://localhost:8081/subjects/test-sqlite-jdbc-accounts-value/versions/2 | jq .
{“subject”: “test-sqlite-jdbc-accounts-value”,“version”: 2,“id”: 2,“schema”: “{\”type\“:\”record\“,\”name\“:\”accounts\“,\”fields\“:[{\”name\“:\”id\“,\”type\“:\”long\“},{\”name\“:\”name\“,\”type\“:[\”null\“,\”string\“],\”default\“:null},{\”name\“:\”age\“,\”type\“:[\”null\“,\”long\“],\”default\“:null}],\”connect.name\“:\”accounts\“}”}
===== Documentation / Reference ===== * https://docs.confluent.io/current/connect/connect-jdbc/docs/source_connector.html#quickstart

Discover More
Docker Memory Kafka
Kafka - Docker

Docker usage in Confluent. The docker host must: use virtualbox as provider have a minimal of 6000 as virtual memory In docker machine command line, it means: In virtualbox, you can confirm...
Kafka Commit Log Messaging Process
Kafka - Schema

You can use an schema, for example, to: serialize an object (POJO) and deserialize it back into an object. An Avro schema defines the data structure in a JSON format. where: Schema management...
Kafka Commit Log Messaging Process
Kafka - Tutorials

A list of tutorials that I made on windows. They were my path to learn Kafka confluent In this tutorial: you start all services in a docker container you write and read data into a topic...
Kafka Commit Log Messaging Process
Kafka Connect - SQlite

in Kafka.
Kafka Commit Log Messaging Process
Kafka Connect - Sqlite in Distributed Mode

Sqlite JDBC source connector demo. The same steps than in the article but with a distributed worker Install Docker: Install Git: If you want to make the call with the kafka console utilities...

Share this page:
Follow us:
Task Runner