Kafka - Message Timestamp
About
The message timestamp is used to control the progress of streams and dependent of the application can be differently defined. See source
Articles Related
Source
The timestamp of a message can be retrieved from different source.
Event-time
event-time processing aka “producer time” is the default. This represents the time when the Kafka producer sent the original message.
Ingestion-time
ingestion-time processing aka “broker time” is the time when the Kafka broker received the original message.
Processing-time
The timestamp will be the current time in milliseconds from the system clock. The Streams will operate on the basis of the so-called processing-time of events.
Payload-time
timestamps embedded in the payload of messages.
Management
Set
Built-in
Built-in timestamp are automatically embedded into Kafka messages by the Kafka producer client (since Kafka 0.10.0.0)
- event-time aka “producer time” (default)
- ingestion-time aka “broker time”
Get
kafka-console-consumer
Kafka - kafka-console-consumer with the property print.timestamp
kafka-console-consumer.sh \
....
--property print.timestamp \
.....
API
A timestamp extractor configured with the timestamp.extractor properties: extracts a timestamp from an instance of ConsumerRecord.
ConsumerRecordTimestampExtractor
The default extractor is ConsumerRecordTimestampExtractor and extract the built-in timestamp.
The value of the configuration log.message.timestamp.type controls the type:
- CreateTime = event-time
- CreateTime = ingestion-time
WallclockTimestampExtractor
The WallclockTimestampExtractor extractor returns the processing-time of events.
CustomTimestampExtractor
A custom TimestampExtractor retrieve the payload-time timestamp (ie embedded in the payload of messages).
Example of a custom TimestampExtractor implementation:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
// Extracts the embedded timestamp of a record (giving you "event time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record) {
// `Foo` is your own custom class, which we assume has a method that returns
// the embedded timestamp (in milliseconds).
Foo myPojo = (Foo) record.value();
if (myPojo != null) {
return myPojo.getTimestampInMillis();
}
else {
// Kafka allows `null` as message value. How to handle such message values
// depends on your use case. In this example, we decide to fallback to
// wall-clock time (= processing-time).
return System.currentTimeMillis();
}
}
}
You would then define the custom timestamp extractor in your Streams configuration as follows:
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName());