Kafka - Message Timestamp

Kafka Commit Log Messaging Process


The message timestamp is used to control the progress of streams and dependent of the application can be differently defined. See source


The timestamp of a message can be retrieved from different source.


event-time processing aka “producer time” is the default. This represents the time when the Kafka producer sent the original message.


ingestion-time processing aka “broker time” is the time when the Kafka broker received the original message.


The timestamp will be the current time in milliseconds from the system clock. The Streams will operate on the basis of the so-called processing-time of events.


timestamps embedded in the payload of messages.




Built-in timestamp are automatically embedded into Kafka messages by the Kafka producer client (since Kafka



Kafka - kafka-console-consumer with the property print.timestamp

kafka-console-consumer.sh \
   --property print.timestamp \


A timestamp extractor configured with the timestamp.extractor properties: extracts a timestamp from an instance of ConsumerRecord.


The default extractor is ConsumerRecordTimestampExtractor and extract the built-in timestamp.

The value of the configuration log.message.timestamp.type controls the type:


The WallclockTimestampExtractor extractor returns the processing-time of events.


A custom TimestampExtractor retrieve the payload-time timestamp (ie embedded in the payload of messages).

Example of a custom TimestampExtractor implementation:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  public long extract(ConsumerRecord<Object, Object> record) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (in milliseconds).
    Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      return myPojo.getTimestampInMillis();
    else {
      // Kafka allows `null` as message value.  How to handle such message values
      // depends on your use case.  In this example, we decide to fallback to
      // wall-clock time (= processing-time).
      return System.currentTimeMillis();


You would then define the custom timestamp extractor in your Streams configuration as follows:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName());

Documentation / Reference

Discover More
Kafka Commit Log Messaging Process
Kafka - (Record|Message)

in Kafka. The Kafka cluster stores streams of records in categories called topics. Each record consists of: a key, a value, and a timestamp. See built-in timestamp org/apache/kafka/connect/data/Structorg.apache.kafka.connect.data.Struct...
Kafka Commit Log Messaging Process
Kafka - kafka-console-consumer

kafka-console-consumer is a consumer command line that: read data from a Kafka topic and write it to standard output (console). Example with docker Option Description Example ...

Share this page:
Follow us:
Task Runner