Kafka - Message Timestamp


The message timestamp is used to control the progress of streams and dependent of the application can be differently defined. See source


The timestamp of a message can be retrieved from different source.


event-time processing aka “producer time” is the default. This represents the time when the Kafka producer sent the original message.


ingestion-time processing aka “broker time” is the time when the Kafka broker received the original message.


The timestamp will be the current time in milliseconds from the system clock. The Streams will operate on the basis of the so-called processing-time of events.


timestamps embedded in the payload of messages.




Built-in timestamp are automatically embedded into Kafka messages by the Kafka producer client (since Kafka



Kafka - kafka-console-consumer with the property print.timestamp

kafka-console-consumer.sh \
   --property print.timestamp \


A timestamp extractor configured with the timestamp.extractor properties: extracts a timestamp from an instance of ConsumerRecord.


The default extractor is ConsumerRecordTimestampExtractor and extract the built-in timestamp.

The value of the configuration log.message.timestamp.type controls the type:


The WallclockTimestampExtractor extractor returns the processing-time of events.


A custom TimestampExtractor retrieve the payload-time timestamp (ie embedded in the payload of messages).

Example of a custom TimestampExtractor implementation:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  public long extract(ConsumerRecord<Object, Object> record) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (in milliseconds).
    Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      return myPojo.getTimestampInMillis();
    else {
      // Kafka allows `null` as message value.  How to handle such message values
      // depends on your use case.  In this example, we decide to fallback to
      // wall-clock time (= processing-time).
      return System.currentTimeMillis();


You would then define the custom timestamp extractor in your Streams configuration as follows:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName());

Documentation / Reference

Powered by ComboStrap