Table of Contents

About

A Kafka producer is an object that consists of:

  • a pool of buffer space that holds records that haven't yet been transmitted to the server
  • a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.

Kafka Connect uses it to communicate with Kafka.

Properties

A producer:

  • has no group for coordination.
  • map each message to a topic partition and send a produce request to the leader of that partition.
  • writes asynchronously by default. To make writes synchronous, just wait on the returned future.

Management

Configuration

Full list of configuration settings

The configs can be overridden by prefixing them with producer. For example:

producer.retries=1

Minimal configuration:

Configuration Desc
Producer Registry
bootstrap.servers A list of servers to discover the cluster
client.id The client id for authentication

Partitioner

The built-in partitioner selects a partition using a hash function. All messages with:

  • the same non-empty key will be sent to the same partition
  • an empty key will be sent in a round-robin fashion to ensure an even distribution across the topic partitions.

A custom partitioner may be created for a semantic partition function (for instance by month based on some key in the record).

Batch

Configuration Desc
batch.size The size of the buffer. larger = more batching and requires more memory
linger.ms the producer will delay sending - wait up to that number of milliseconds before sending a request - analogous to Nagle's algorithm in TCP
compression.type Larger batches mean a higher compression ratio.

Producer Queue (unsent messages)

Configuration Desc
buffer.memory limit the total memory
max.block.ms retry time to insert in the queue before raising an exception
request.timeout.ms Removed messages older than this timeout and raise an exception

Acknowledgement / Message Durability

Each produce request to the partition leader can be held until the replicas have successfully acknowledged the write. (control over message durability vs overall throughput)

Messages which were acknowledged by the leader only (i.e. acks=1) can be lost if the partition leader fails before the replicas have copied the message.

Value Require
0 no guarantee that the message was successfully written to the broker’s log - the broker does not send a response - maximum throughput
1 an explicit acknowledgement from the partition leader that the write succeeded.
all the partition leader accept the write, but it was successfully replicated to all of the in-sync replicas.

Request retry

If a request fails, the producer can automatically retry. Enabling retries also opens up the possibility of duplicates. See message delivery semantics

This behavior is controlled bh the retries configuration property

retries=0 won't try to retry.

In idempotent mode, retries will not introduce duplicate.

Message Ordering

Without retries enabled, the broker will preserve the order of writes it receives, but there could be gaps due to individual send failures.

If retries > 0 - Message reordering occurs. To enable retries without reordering, you can set max.in.flight.requests.per.connection to 1 to ensure that only one request can be sent to the broker at a time.

Idempotent and transactional Mode

Since Kafka 0.11, two modes:

  • the idempotent producer
  • and the transactional producer.

Idempotent

The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. The producer can only guarantee idempotence for messages sent within a single session.

retries will no longer introduce duplicates.

To enable idempotence, set the following properties enable.idempotence=true. If set, the following properties are set:

  • retries=Integer.MAX_VALUE
  • acks=all.

Transactional

The transactional producer allows an application to send messages to multiple partitions (and topics) atomically.

To enable, the transaction mode, the following properties must be set:

  • transactional.id
  • replication.factor=3
  • min.insync.replicas=2
  • read only committed messages

The purpose of the transactional.id is to enable transaction recovery across multiple sessions of a single producer instance.

Example:

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();

API and Example

http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Utility

kafka-console-producer and kafka-avro-console-producer are command line tool to read data from standard output and write it to a Kafka topic.