About
A Kafka producer is an object that consists of:
- a pool of buffer space that holds records that haven't yet been transmitted to the server
- a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.
Kafka Connect uses it to communicate with Kafka.
Articles Related
Properties
A producer:
- has no group for coordination.
- map each message to a topic partition and send a produce request to the leader of that partition.
- writes asynchronously by default. To make writes synchronous, just wait on the returned future.
Management
Configuration
Full list of configuration settings
The configs can be overridden by prefixing them with producer. For example:
producer.retries=1
Minimal configuration:
Configuration | Desc |
---|---|
Producer Registry | |
bootstrap.servers | A list of servers to discover the cluster |
client.id | The client id for authentication |
Partitioner
The built-in partitioner selects a partition using a hash function. All messages with:
- the same non-empty key will be sent to the same partition
- an empty key will be sent in a round-robin fashion to ensure an even distribution across the topic partitions.
A custom partitioner may be created for a semantic partition function (for instance by month based on some key in the record).
Batch
Configuration | Desc |
---|---|
batch.size | The size of the buffer. larger = more batching and requires more memory |
linger.ms | the producer will delay sending - wait up to that number of milliseconds before sending a request - analogous to Nagle's algorithm in TCP |
compression.type | Larger batches mean a higher compression ratio. |
Producer Queue (unsent messages)
Configuration | Desc |
---|---|
buffer.memory | limit the total memory |
max.block.ms | retry time to insert in the queue before raising an exception |
request.timeout.ms | Removed messages older than this timeout and raise an exception |
Acknowledgement / Message Durability
Each produce request to the partition leader can be held until the replicas have successfully acknowledged the write. (control over message durability vs overall throughput)
Messages which were acknowledged by the leader only (i.e. acks=1) can be lost if the partition leader fails before the replicas have copied the message.
Value | Require |
---|---|
0 | no guarantee that the message was successfully written to the broker’s log - the broker does not send a response - maximum throughput |
1 | an explicit acknowledgement from the partition leader that the write succeeded. |
all | the partition leader accept the write, but it was successfully replicated to all of the in-sync replicas. |
Request retry
If a request fails, the producer can automatically retry. Enabling retries also opens up the possibility of duplicates. See message delivery semantics
This behavior is controlled bh the retries configuration property
retries=0 won't try to retry.
In idempotent mode, retries will not introduce duplicate.
Message Ordering
Without retries enabled, the broker will preserve the order of writes it receives, but there could be gaps due to individual send failures.
If retries > 0 - Message reordering occurs. To enable retries without reordering, you can set max.in.flight.requests.per.connection to 1 to ensure that only one request can be sent to the broker at a time.
Idempotent and transactional Mode
Since Kafka 0.11, two modes:
- the idempotent producer
- and the transactional producer.
Idempotent
The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. The producer can only guarantee idempotence for messages sent within a single session.
retries will no longer introduce duplicates.
To enable idempotence, set the following properties enable.idempotence=true. If set, the following properties are set:
- retries=Integer.MAX_VALUE
- acks=all.
Transactional
The transactional producer allows an application to send messages to multiple partitions (and topics) atomically.
To enable, the transaction mode, the following properties must be set:
- transactional.id
- replication.factor=3
- min.insync.replicas=2
- read only committed messages
The purpose of the transactional.id is to enable transaction recovery across multiple sessions of a single producer instance.
Example:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
API and Example
http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
Utility
kafka-console-producer and kafka-avro-console-producer are command line tool to read data from standard output and write it to a Kafka topic.