About
The message timestamp is used to control the progress of streams and dependent of the application can be differently defined. See source
Articles Related
Source
The timestamp of a message can be retrieved from different source.
Event-time
event-time processing aka “producer time” is the default. This represents the time when the Kafka producer sent the original message.
Ingestion-time
ingestion-time processing aka “broker time” is the time when the Kafka broker received the original message.
Processing-time
The timestamp will be the current time in milliseconds from the system clock. The Streams will operate on the basis of the so-called processing-time of events.
Payload-time
timestamps embedded in the payload of messages.
Management
Set
Built-in
Built-in timestamp are automatically embedded into Kafka messages by the Kafka producer client (since Kafka 0.10.0.0)
- event-time aka “producer time” (default)
- ingestion-time aka “broker time”
Get
kafka-console-consumer
Kafka - kafka-console-consumer with the property print.timestamp
kafka-console-consumer.sh \
....
--property print.timestamp \
.....
API
A timestamp extractor configured with the timestamp.extractor properties: extracts a timestamp from an instance of ConsumerRecord.
ConsumerRecordTimestampExtractor
The default extractor is ConsumerRecordTimestampExtractor and extract the built-in timestamp.
The value of the configuration log.message.timestamp.type controls the type:
- CreateTime = event-time
- CreateTime = ingestion-time
WallclockTimestampExtractor
The WallclockTimestampExtractor extractor returns the processing-time of events.
CustomTimestampExtractor
A custom TimestampExtractor retrieve the payload-time timestamp (ie embedded in the payload of messages).
Example of a custom TimestampExtractor implementation:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
// Extracts the embedded timestamp of a record (giving you "event time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record) {
// `Foo` is your own custom class, which we assume has a method that returns
// the embedded timestamp (in milliseconds).
Foo myPojo = (Foo) record.value();
if (myPojo != null) {
return myPojo.getTimestampInMillis();
}
else {
// Kafka allows `null` as message value. How to handle such message values
// depends on your use case. In this example, we decide to fallback to
// wall-clock time (= processing-time).
return System.currentTimeMillis();
}
}
}
You would then define the custom timestamp extractor in your Streams configuration as follows:
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName());