Table of Contents

About

The message timestamp is used to control the progress of streams and dependent of the application can be differently defined. See source

Source

The timestamp of a message can be retrieved from different source.

Event-time

event-time processing aka “producer time” is the default. This represents the time when the Kafka producer sent the original message.

Ingestion-time

ingestion-time processing aka “broker time” is the time when the Kafka broker received the original message.

Processing-time

The timestamp will be the current time in milliseconds from the system clock. The Streams will operate on the basis of the so-called processing-time of events.

Payload-time

timestamps embedded in the payload of messages.

Management

Set

Built-in

Built-in timestamp are automatically embedded into Kafka messages by the Kafka producer client (since Kafka 0.10.0.0)

Get

kafka-console-consumer

Kafka - kafka-console-consumer with the property print.timestamp

kafka-console-consumer.sh \
   .... 
   --property print.timestamp \
   .....

API

A timestamp extractor configured with the timestamp.extractor properties: extracts a timestamp from an instance of ConsumerRecord.

ConsumerRecordTimestampExtractor

The default extractor is ConsumerRecordTimestampExtractor and extract the built-in timestamp.

The value of the configuration log.message.timestamp.type controls the type:

WallclockTimestampExtractor

The WallclockTimestampExtractor extractor returns the processing-time of events.

CustomTimestampExtractor

A custom TimestampExtractor retrieve the payload-time timestamp (ie embedded in the payload of messages).

Example of a custom TimestampExtractor implementation:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (in milliseconds).
    Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      return myPojo.getTimestampInMillis();
    }
    else {
      // Kafka allows `null` as message value.  How to handle such message values
      // depends on your use case.  In this example, we decide to fallback to
      // wall-clock time (= processing-time).
      return System.currentTimeMillis();
    }
  }

}

You would then define the custom timestamp extractor in your Streams configuration as follows:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName());

Documentation / Reference