Kafka - kafka-console-consumer

Kafka Commit Log Messaging Process


kafka-console-consumer is a consumer command line that:


Command line

kafka-console-consumer.sh \
    --bootstrap-server localhost:9092   \
    --topic mytopic   \
    --from-beginning  \
    --formatter kafka.tools.DefaultMessageFormatter  \
    --property print.key=true   \
    --property print.value=true 

Old vs new

# 1 New Consumer
kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --new-consumer --from-beginning --max-messages 42

# 2 Old consumer
/usr/bin/kafka-console-consumer --zookeeper zk01.example.com:2181 --topic t1


Example with docker

docker-compose exec kafka  \
  kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --new-consumer --from-beginning --max-messages 42


Option Description Example
New Consumer
--bootstrap-server Kafka - Bootstrap Servers broker1:9092,broker2:9092
--new-consumer Use the new consumer implementation. This is the default. No value
Old Consumer
--zookeeper The connection string for the zookeeper connection host:port,host:port
Addressing: Topic
--topic The topic id to consume on.
--whitelist Whitelist of topics to include for consumption.
--blacklist Blacklist of topics to exclude from consumption.
Addressing: Offset
--delete-consumer-offsets Kafka - (Consumer) Offset - If specified, the consumer path in zookeeper is deleted when starting up
--from-beginning Start with the earliest message present in the log rather than the latest message.
--offset The offset id to consume from.
- a non-negative number),
- or earliest (from the beginning)
- or latest (from end)
default: latest
Addressing: Partition
--partition The partition to consume from. Integer
--consumer-property Pass config properties in the form key=value
--consumer.config Consumer config properties file.
--key-deserializer deserializer for key org.apache.kafka.common. serialization.StringDeserializer
--value-deserializer deserializer for values org.apache.kafka.common. serialization.StringSerializer
Processing parameters
--skip-message-on-error Skip the errors instead of halt.
--timeout-ms Exit if no message is available for consumption in the specified interval.
--isolation-level Set to:
read_committed in order to filter out transactional messages which are not committed.
read_uncommitted to read all messages.
default: read_uncommitted
--csv-reporter-enabled If set, the CSV metrics reporter will be enabled
--metrics-dir Output directory for the CSV metrics
--enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)

Console output

Console Output
--max-messages The maximum number of messages to consume before exiting. If not set, consumption is continual. 10
--formatter The name of a class to use for formatting kafka messages for display. default kafka.tools.DefaultMessageFormatter.
--property The properties to initialize the message formatter. print a key –property print.key=true

A formatter extends the MessageFormatter class.

There is actually:

Property options are:

  • print.timestamp - print the timestamp
  • print.key - print the key
  • print.value - print the value
  • key.separator and line.separator
  • key.deserializer and value.deserializer. Kafka - Serdes

The full Java command and the code

java -Xmx512M
 -cp :/usr/bin/../share/java/kafka-serde-tools/*:/usr/bin/../package-kafka-serde-tools/target/kafka-serde-tools-package-*-development/share/java/kafka-serde-tools/*:/usr/bin/../package-schema-registry/target/kafka-schema-registry-package-*-development/share/java/schema-registry/*:/usr/bin/../share/java/confluent-common/*:/usr/bin/../share/java/rest-utils/*:/usr/bin/../share/java/schema-registry/*
 --formatter  io.confluent.kafka.formatter.AvroMessageFormatter
 --property  schema.registry.url=http://localhost:8081
 --bootstrap-server broker:9092
 --topic test-sqlite-jdbc-accounts

Discover More
Log Consumer
Kafka - (Consumer) Offset

The offset is the position of a consumer in a topic keyrecord Zookeeperconsumer groupStream Processing ...
Kafka Commit Log Messaging Process
Kafka - Console

The kafka application comes with utilities. This console application permits to read and write data from the console. : A consumer command line to read data from a Kafka topic and write it to...
Log Consumer
Kafka - Consumer

A consumer. A sink connector is a consumer. kafka-console-consumer is a command line to read data from a Kafka topic and write it to standard output. The configs can be overridden by...
Kafka Commit Log Messaging Process
Kafka - Consumer Group

The consumer group is used for coordination between consumer The consumer group is given by the group.id configuration property of a consumer. A console will generate automatically one for you with...
Kafka Commit Log Messaging Process
Kafka - Message Timestamp

The message timestamp is used to control the progress of streams and dependent of the application can be differently defined. See The timestamp of a message can be retrieved from different source....
Kafka Commit Log Messaging Process
Kafka - kafka-avro-console-consumer utility

The kafka-avro-console-consumer is a the kafka-console-consumer with a avro formatter (io.confluent.kafka.formatter.AvroMessageFormatter) This console uses the Avro converter with the Schema Registry...
Kafka Commit Log Messaging Process
Kafka Connect - Sqlite in Distributed Mode

Sqlite JDBC source connector demo. The same steps than in the article but with a distributed worker Install Docker: Install Git: If you want to make the call with the kafka console utilities...

Share this page:
Follow us:
Task Runner