Apache Kafka is a distributed log storage platform. It is usually used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant and very fast. In return, it is append-only and there is only one index based on message time.
At Yieldlab, each ad request causes one or more auctions to be held. The resulting auction logs are published to Kafka. Afterwards these logs are processed by Apache Flink, a stream processing platform, to generate billing reports, analytics data and other metrics. That in turn means Kafka is a mission-critical system that will directly affect our systems.
We operate a Kafka cluster of 9 brokers. The volume of incoming logs are 200 MB/s in serialized form and 100 MB/s in compressed form. The auction log topic has 144 partitions. Keep in mind that the following article will be more effective at Kafka topics with large number of partitions.
First we will explain the default Kafka partition strategy, then the problem we have encountered and how it is detected. Finally, we will show the solution and its effects.
Background
By default Kafka uses org.apache.kafka.clients.producer.internals.DefaultPartitioner
as the partition strategy. Taken from its javadoc:
The default partitioning strategy:
- If a partition is specified in the record, use it
- If no partition is specified but a key is present choose a partition based on a hash of the key
- If no partition or key is present choose a partition in a round-robin fashion
Looking at the source code, it is using the Murmur2 algorithm for keyed streams and round robin when there is no key. This article is about the latter. For keyed streams it will not have any positive or negative effect.
Problem
The round robin partitioner distributes records to brokers uniformly. Because of that, it creates too many ProducerRequests. This can be seen in the “requests-in-flight” metric:

By default ProducerConfig max.in.flight.requests.per.connection
is 5. It is defined as “The maximum number of unacknowledged requests the client will send on a single connection before blocking“. This cluster has 9 brokers and KafkaProducer opens a connection to every broker, hence 9×5=45. But there is another key phrase in the config description: “before blocking“. Blocking sounds ominous, but let’s look at the next parameters:
max.block.ms
: “controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block“. By default it is set to 60 seconds, in our setup we changed it to 10 seconds. That means the KafkaProducer#send method will be blocked upto max.block.ms and it will of course increase latency.
request.timeout.ms
: “the maximum amount of time the client will wait for the response of a request”. By default it is 30 seconds, in our setup is 120 seconds.
Let’s look at some metrics:


We are very close to losing messages and the producer uses more than 100MB for buffers. We are more affected by this problem than normal because our sent records have a high size variation.

There is a high variation for record size. But why is this a problem? The Kafka producer sends records in batches. Each batch has a size of batch.size
, which by default is 16KB. In our setup it is set to 500KB since our records are larger. Each batch is sent as a single ProducerRequest to a single partition. As you remember this was capped at 45. When a large record comes into the producer and batch.size
is exceeded with the new record, the existing batch will be sent and the new records goes into second batch. This will lead to partial and ineffective batches, which increases the number of ProducerRequests and reduces the effectiveness of compression. This effect can be seen from batch-size-avg vs batch-size-max metrics, so this is another important thing to look at for producer metrics: The difference between batch-size-avg vs batch-size-max.

Solution(s)
How can we solve this issue before it becomes critical? requests.in.flight has become a bottleneck which leads to high latency, high memory usage. We want to send more efficient producer requests and less of them in total.
Break down records
One approach is to try to break down the records. Kafka is originally built for collecting metrics at LinkedIn. By default it is tuned for small, fixed size records. It can handle large messages after some tuning but variation in record size will make the producer inefficient. It is better to send the records to different topics and join them in downstream jobs. Consider which fields the downstream jobs will need. This will also increase the performance of downstream jobs as well as decrease data to be sent through the network. But this solution is not always feasible, and we cannot do it for our cases.
Optimize record distribution
Instead of using round robin, we can send the records to the same partition, switching the partition used by time. For this we implemented the TimedPartitioner
:
public class TimedPartitioner implements Partitioner { private int periodInMs = 1000; private int seed = ThreadLocalRandom.current().nextInt(); public TimedPartitioner() { } public void configure(Map<String, ?> configs) { Object lingerMs = configs.get("linger.ms"); if (lingerMs != null && lingerMs instanceof Integer) { periodInMs = (Integer) lingerMs; } } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = this.nextValue(); List<PartitionInfo> available = cluster.availablePartitionsForTopic(topic); if (available.size() > 0) { int part = Utils.toPositive(nextValue) % available.size(); return available.get(part).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; } } else { return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue() { return ((int) (System.currentTimeMillis() / periodInMs)) + seed; } public void close() { } }
As you can see, this is very similar to the default partitioner, but nextValue
is chosen based on System.currentTimeMillis()
and it is bucketed into periodInMs
buckets. Every record for periodInMs
will go to a single partition. As a good starting point periodInMs
is the same as linger.ms
. Also, note that a random seed is added in nextValue
to avoid overloading a single broker from different producers in this server or other servers.
While the default partitioner scatters records to all partitions, this partitioner will send all messages in a specific period of time to go to the same partition. This will decrease the necessary number of ProducerRequests and increase the efficiency of batches.
Let’s see the metrics after changing the partition strategy.





Conclusion
Kafka is very good at what it does but when the data gets bigger it requires cooperation from the developers. In this use-case we have benefitted significantly with a minimum amount of hassle by just using a better suited partitioner instead of the default one. Different use-cases will require different solutions but one thing should always be constant for success: check your metrics. Be data-driven and collect, monitor, understand and analyze your applications and services.