Kafka - kafka-console-consumer

1 - About

kafka-console-consumer is a consumer command line that:

3 - Example

3.1 - Command line

kafka-console-consumer.sh \
    --bootstrap-server localhost:9092   \
    --topic mytopic   \
    --from-beginning  \
    --formatter kafka.tools.DefaultMessageFormatter  \
    --property print.key=true   \
    --property print.value=true 

3.1.2 - Old vs new

# 1 New Consumer
kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --new-consumer --from-beginning --max-messages 42

# 2 Old consumer
/usr/bin/kafka-console-consumer --zookeeper zk01.example.com:2181 --topic t1

3.2 - Docker

Example with docker

docker-compose exec kafka  \
  kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --new-consumer --from-beginning --max-messages 42

4 - Options

Option Description Example
New Consumer
--bootstrap-server Kafka - Bootstrap Servers broker1:9092,broker2:9092
--new-consumer Use the new consumer implementation. This is the default. No value
Old Consumer
--zookeeper The connection string for the zookeeper connection host:port,host:port
Addressing: Topic
--topic The topic id to consume on.
--whitelist Whitelist of topics to include for consumption.
--blacklist Blacklist of topics to exclude from consumption.
Addressing: Offset
--delete-consumer-offsets Kafka - (Consumer) Offset - If specified, the consumer path in zookeeper is deleted when starting up
--from-beginning Start with the earliest message present in the log rather than the latest message.
--offset The offset id to consume from.
- a non-negative number),
- or earliest (from the beginning)
- or latest (from end)
default: latest
Addressing: Partition
--partition The partition to consume from. Integer
--consumer-property Pass config properties in the form key=value
--consumer.config Consumer config properties file.
--key-deserializer deserializer for key org.apache.kafka.common. serialization.StringDeserializer
--value-deserializer deserializer for values org.apache.kafka.common. serialization.StringSerializer
Processing parameters
--skip-message-on-error Skip the errors instead of halt.
--timeout-ms Exit if no message is available for consumption in the specified interval.
--isolation-level Set to:
read_committed in order to filter out transactional messages which are not committed.
read_uncommitted to read all messages.
default: read_uncommitted
--csv-reporter-enabled If set, the CSV metrics reporter will be enabled
--metrics-dir Output directory for the CSV metrics
--enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)

4.1 - Console output

Console Output
--max-messages The maximum number of messages to consume before exiting. If not set, consumption is continual. 10
--formatter The name of a class to use for formatting kafka messages for display. default kafka.tools.DefaultMessageFormatter.
--property The properties to initialize the message formatter. print a key –property print.key=true

A formatter extends the MessageFormatter class.

There is actually:

Property options are:

  • print.timestamp - print the timestamp
  • print.key - print the key
  • print.value - print the value
  • key.separator and line.separator
  • key.deserializer and value.deserializer. Kafka - Serdes

5 - The full Java command and the code

java -Xmx512M
 -cp :/usr/bin/../share/java/kafka-serde-tools/*:/usr/bin/../package-kafka-serde-tools/target/kafka-serde-tools-package-*-development/share/java/kafka-serde-tools/*:/usr/bin/../package-schema-registry/target/kafka-schema-registry-package-*-development/share/java/schema-registry/*:/usr/bin/../share/java/confluent-common/*:/usr/bin/../share/java/rest-utils/*:/usr/bin/../share/java/schema-registry/*
 --formatter  io.confluent.kafka.formatter.AvroMessageFormatter
 --property  schema.registry.url=http://localhost:8081
 --bootstrap-server broker:9092
 --topic test-sqlite-jdbc-accounts

