Kafka Connect - File Source connector
Table of Contents
About
Reading File with connect. Adapted from Quickstart kafka connect
connect is running in distributed mode. All connector operations are then only for the distributed mode
Articles Related
Steps
- Start the demo docker image with bash
docker run -ti --name connect-demo -p 8083:8083 -p 8081:8081 -p 2181:2181 -p 9092:9092 confluentinc/docker-demo-base:3.3.0 //bin//bash
- Install jq
apt-get update
apt-get install jq
- Start Kafka
confluent start
- Is the connector available in /etc/kafka
confluent list connectors | grep file-source
file-source
- Connector configuration /etc/kafka/connect-file-source.properties
cat /etc/kafka/connect-file-source.properties
# User defined connector instance name.
name=local-file-source
# The class implementing the connector
connector.class=FileStreamSource
# Maximum number of tasks to run for this connector instance
tasks.max=1
# The input file (path relative to worker's working directory)
# This is the only setting specific to the FileStreamSource
file=test.txt
# The output topic in Kafka
topic=connect-test
- Seed the file <note tip>The working path is the current directory because we have started the service from this path</note>
for i in {1..3}; do echo "log line $i for nico demo"; done > test.txt
cat test.txt
log line 1 for nico demo
log line 2 for nico demo
log line 3 for nico demo
- Load the connector. Upon success it will print the connector’s configuration.
confluent load file-source -d /etc/kafka/connect-file-source.properties
{
"name": "file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": "test.txt",
"topic": "connect-test",
"name": "file-source"
},
"tasks": []
}
- Global Status (All connectors)
confluent status connectors
[
"file-source"
]
- Status of the file-source connector
confluent status file-source
{
"name": "file-source",
"connector": {
"state": "RUNNING",
"worker_id": "172.17.0.2:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "172.17.0.2:8083"
}
]
}
- Consume and let the console open. kafka-avro-console-consumer is used because the data has been stored in Kafka using Avro format.
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
"log line 1 for nico demo"
"log line 2 for nico demo"
"log line 3 for nico demo"
- Produce - Start a new console, add lines to the text file
docker exec -it connect-demo //bin//bash
for i in {4..6}; do echo "log line $i for nico demo"; done >> test.txt
- Check the consumer
"log line 4 for nico demo"
"log line 5 for nico demo"
"log line 6 for nico demo"