Kafka - (Consumer) Offset


The offset is the position of a consumer in a topic

An offset is not the key but an automatic record position id

For versions less than 0.9 Apache Zookeeper was used for managing the offsets of the consumer group. Offset management is the mechanism, which tracks the number of records that have been consumed from a partition of a topic for a particular consumer group. Kafka 0.10 came out with out of the box support for Stream Processing.



Reset tool


In a distributed mode, connect registers the offset of each connector in a special topic called connect-offsets

with Kafka - kafka-console-consumer

kafka-console-consumer \
--bootstrap-server localhost:9092 \
--from-beginning \
--property print.key=true \
--topic connect-offsets

Example output that you get following the file-source demo

["file-source",{"filename":"test.txt"}] {"position":75}
["file-source",{"filename":"test.txt"}] {"position":150}
["file-source",{"filename":"test.txt"}] {"position":225}


How to check the number of messages read and written, as well as the lag for each consumer in a specific consumer group.


kafka-consumer-groups --bootstrap-server broker:9092 --describe --group console-consumer-36650
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

Consumer group 'console-consumer-36650' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test-sqlite-jdbc-accounts      0          2               8               6          -                                                 -                              -


ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0, use the ConsumerGroupCommand below instead. (kafka.tools.ConsumerOffsetChecker$)

kafka-consumer-offset-checker --group consumer-group --topic test-sqlite-jdbc-accounts --zookeeper zookeeper:2181


An interactive shell for getting consumer offsets.

kafka-run-class kafka.tools.GetOffsetShell
Option                                 Description                                 
------                                 -----------                                 
--broker-list <String: hostname:       REQUIRED: The list of hostname and          
  port,...,hostname:port>                port of the server to connect to.         
--max-wait-ms <Integer: ms>            The max amount of time each fetch           
                                         request waits. (default: 1000)            
--offsets <Integer: count>             number of offsets returned (default: 1)     
--partitions <String: partition ids>   comma separated list of partition ids.      
                                         If not specified, it will find            
                                         offsets for all partitions (default:      
--time <Long: timestamp/-1(latest)/-2  timestamp of the offsets before that        
  (earliest)>                            (default: -1)                             
--topic <String: topic>                REQUIRED: The topic to get offset from.     


kafka-run-class kafka.tools.GetOffsetShell --broker-list "broker:9092" --topic mytopic


Retrieves the offsets of broker partitions in zookeeper and prints to an output file

ExportZkOffsets is deprecated

kafka-run-class.sh kafka.tools.ExportZkOffsets
WARN WARNING: ExportZkOffsets is deprecated and will be dropped in a future release following (kafka.tools.ExportZkOffsets$)
Export consumer offsets to an output file.

Option                  Description
------                  -----------
--group <String>        Consumer group.
--help                  Print this message.
--output-file <String>  Output file
--zkconnect <String>    ZooKeeper connect string. (default: localhost:2181)

Powered by ComboStrap