Kafka Connect - File Source connector


Reading File with connect. Adapted from Quickstart kafka connect

connect is running in distributed mode. All connector operations are then only for the distributed mode


  • Start the demo docker image with bash
docker run   -ti --name connect-demo -p 8083:8083 -p 8081:8081 -p 2181:2181 -p 9092:9092 confluentinc/docker-demo-base:3.3.0 //bin//bash 
  • Install jq
apt-get update
apt-get install jq
  • Start Kafka
confluent start
  • Is the connector available in /etc/kafka
confluent list connectors | grep file-source
  • Connector configuration /etc/kafka/connect-file-source.properties
cat /etc/kafka/connect-file-source.properties
# User defined connector instance name.
# The class implementing the connector
# Maximum number of tasks to run for this connector instance
# The input file (path relative to worker's working directory)
# This is the only setting specific to the FileStreamSource
# The output topic in Kafka
  • Without schema registry, the default converter must be set to JSON by setting the key.converter and value.converter properties. Why ????
  • Seed the file <note tip>The working path is the current directory because we have started the service from this path</note>
for i in {1..3}; do echo "log line $i for nico demo"; done > test.txt
cat test.txt
log line 1 for nico demo
log line 2 for nico demo
log line 3 for nico demo
confluent load file-source -d /etc/kafka/connect-file-source.properties
  "name": "file-source",                   
  "config": {                              
    "connector.class": "FileStreamSource", 
    "tasks.max": "1",                      
    "file": "test.txt",                    
    "topic": "connect-test",               
    "name": "file-source"                  
  "tasks": []                              
  • Global Status (All connectors)
confluent status connectors
  • Status of the file-source connector
confluent status file-source
  "name": "file-source",             
  "connector": {                     
    "state": "RUNNING",              
    "worker_id": ""   
  "tasks": [                         
      "state": "RUNNING",            
      "id": 0,                       
      "worker_id": "" 
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
"log line 1 for nico demo"
"log line 2 for nico demo"
"log line 3 for nico demo"
  • Produce - Start a new console, add lines to the text file
docker exec -it connect-demo //bin//bash
for i in {4..6}; do echo "log line $i for nico demo"; done >> test.txt
  • Check the consumer
"log line 4 for nico demo"
"log line 5 for nico demo"
"log line 6 for nico demo"

Powered by ComboStrap