About
kafka-console-consumer is a consumer command line that:
- read data from a Kafka topic
- and write it to standard output (console).
Articles Related
Example
Command line
Print key and value
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic mytopic \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true
Old vs new
# 1 New Consumer
kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --new-consumer --from-beginning --max-messages 42
# 2 Old consumer
/usr/bin/kafka-console-consumer --zookeeper zk01.example.com:2181 --topic t1
Docker
Example with docker
docker-compose exec kafka \
kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --new-consumer --from-beginning --max-messages 42
Options
Option | Description | Example |
---|---|---|
New Consumer | ||
--bootstrap-server | Kafka - Bootstrap Servers | broker1:9092,broker2:9092 |
--new-consumer | Use the new consumer implementation. This is the default. | No value |
Old Consumer | ||
--zookeeper | The connection string for the zookeeper connection | host:port,host:port |
Addressing: Topic | ||
--topic | The topic id to consume on. | |
--whitelist | Whitelist of topics to include for consumption. | |
--blacklist | Blacklist of topics to exclude from consumption. | |
Addressing: Offset | ||
--delete-consumer-offsets | Kafka - (Consumer) Offset - If specified, the consumer path in zookeeper is deleted when starting up | |
--from-beginning | Start with the earliest message present in the log rather than the latest message. | |
--offset | The offset id to consume from. - a non-negative number), - or earliest (from the beginning) - or latest (from end) | default: latest |
Addressing: Partition | ||
--partition | The partition to consume from. | Integer |
Config | ||
--consumer-property | Pass config properties in the form key=value | |
--consumer.config | Consumer config properties file. | |
Serialization | ||
--key-deserializer | deserializer for key | org.apache.kafka.common. serialization.StringDeserializer |
--value-deserializer | deserializer for values | org.apache.kafka.common. serialization.StringSerializer |
Processing parameters | ||
--skip-message-on-error | Skip the errors instead of halt. | |
--timeout-ms | Exit if no message is available for consumption in the specified interval. | |
--isolation-level | Set to: read_committed in order to filter out transactional messages which are not committed. read_uncommitted to read all messages. | default: read_uncommitted |
Reporter | ||
--csv-reporter-enabled | If set, the CSV metrics reporter will be enabled | |
--metrics-dir | Output directory for the CSV metrics | |
--enable-systest-events | Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) |
Console output
Console Output | ||
---|---|---|
--max-messages | The maximum number of messages to consume before exiting. If not set, consumption is continual. | 10 |
--formatter | The name of a class to use for formatting kafka messages for display. | default kafka.tools.DefaultMessageFormatter. |
--property | The properties to initialize the message formatter. | print a key –property print.key=true |
A formatter extends the MessageFormatter class.
There is actually:
Property options are:
- print.timestamp - print the timestamp
- print.key - print the key
- print.value - print the value
- key.separator and line.separator
- key.deserializer and value.deserializer. Kafka - Serdes
The full Java command and the code
- Full path command of the kafka-avro-console-consumer
java -Xmx512M
-server
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:+DisableExplicitGC
-Djava.awt.headless=true
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dschema-registry.log.dir=/usr/bin/../logs
-Dlog4j.configuration=file:/etc/schema-registry/log4j.properties
-cp :/usr/bin/../share/java/kafka-serde-tools/*:/usr/bin/../package-kafka-serde-tools/target/kafka-serde-tools-package-*-development/share/java/kafka-serde-tools/*:/usr/bin/../package-schema-registry/target/kafka-schema-registry-package-*-development/share/java/schema-registry/*:/usr/bin/../share/java/confluent-common/*:/usr/bin/../share/java/rest-utils/*:/usr/bin/../share/java/schema-registry/*
kafka.tools.ConsoleConsumer
--formatter io.confluent.kafka.formatter.AvroMessageFormatter
--property schema.registry.url=http://localhost:8081
--bootstrap-server broker:9092
--topic test-sqlite-jdbc-accounts
--from-beginning