Kafka Consumer Performance Optimization: Thread-Per-Consumer vs Multi-Threaded Processing Guide
System Design

Kafka Consumer Performance Optimization: Thread-Per-Consumer vs Multi-Threaded Processing Guide

2025-05-1
5 min read

Multithreading is "the ability of a central processing unit (CPU) (or a single core in a multi-core processor) to provide multiple threads of execution concurrently, supported by the operating system." In situations where the work can be divided into smaller units, which can be run in parallel, without negative effects on data consistency, multithreading can be used to improve application performance.

In Kafka topics, records are grouped into smaller units—partitions, which can be processed independently without compromising the correctness of the results and lays the foundations for parallel processing. This is usually achieved by scaling: using multiple consumers within the same group, each processing data from a subset of topic partitions and running in a single thread.

Thread per Consumer Model

For most use cases, reading and processing messages in a single thread is perfectly fine, so it's not surprising that thread per consumer threading model is commonly used with the Apache Kafka® consumer. When processing doesn't involve I/O operations, it's usually very fast, so the poll loop runs smoothly. While this model has many benefits, especially with regard to the simplicity of client code, it also has limitations that can cause problems in some use cases.

Basic Implementation

A typical single-threaded implementation is centered around a poll loop:

while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); // Handle fetched records }

Offset Management

With the default configuration, the consumer automatically stores offsets to Kafka. The auto.commit.interval.ms config controls the frequency of commits. While users might assume that offsets are committed in these intervals by a background thread, in reality:

  • Offsets are committed during the consumer's poll method execution
  • auto.commit.interval.ms only defines the minimum delay between commits
  • Only offsets of records returned in previous poll calls are committed
  • Since processing happens between poll calls, offsets of unprocessed records will never be committed
  • This guarantees at-least-once delivery semantics

Group Rebalancing

Consumer group rebalancing is triggered when:

  • A new consumer joins the group
  • An existing consumer leaves the group
  • An existing consumer changes subscription
  • Partitions are added to one of the subscribed topics

Multi-Threaded Consumer Model

Motivation

While the thread-per-consumer model works well for many use cases, there are scenarios where a multi-threaded approach offers significant advantages:

  1. Slow Processing Issues

    • The max.poll.interval.ms config (default: 5 minutes) defines the maximum delay between poll calls
    • If a consumer fails to call poll within this interval, it's considered dead
    • This can be problematic when processing individual records takes a long time
  2. Handling Record Processing Exceptions

    • Different retry strategies might be needed
    • Some use cases require indefinite retries
    • Single-threaded model limits processing time

Implementation Approach

The multi-threaded solution uses runnable tasks executed by a thread pool for processing records. Here's a basic implementation:

public class Task implements Runnable { private final List<ConsumerRecord<String, String>> records; private volatile boolean stopped = false; private volatile boolean started = false; private final CompletableFuture<Long> completion = new CompletableFuture<>(); private volatile boolean finished = false; private final ReentrantLock startStopLock = new ReentrantLock(); private final AtomicLong currentOffset = new AtomicLong(-1); public Task(List<ConsumerRecord<String, String>> records) { this.records = records; } public void run() { startStopLock.lock(); if (stopped) { return; } started = true; startStopLock.unlock(); for (ConsumerRecord<String, String> record : records) { if (stopped) break; // process record here and make sure you catch all exceptions currentOffset.set(record.offset() + 1); } finished = true; completion.complete(currentOffset.get()); } // ... other methods for task management }

Key Implementation Considerations

  1. Partition Processing

    • Records from the same partition must be processed by only one thread
    • Use KafkaConsumer.pause() to prevent parallel processing of the same partition
    • Resume partitions only after current processing is complete
  2. Offset Management

    • Disable automatic offset commit
    • Commit offsets manually after successful processing
    • Track offsets per partition
  3. Group Rebalancing

    • Implement ConsumerRebalanceListener
    • Handle partition revocation properly
    • Ensure proper offset commits during rebalancing

Handling Rebalancing

public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 1. Stop all tasks handling records from revoked partitions Map<TopicPartition, Task> stoppedTasks = new HashMap<>(); for (TopicPartition partition : partitions) { Task task = activeTasks.remove(partition); if (task != null) { task.stop(); stoppedTasks.put(partition, task); } } // 2. Wait for stopped tasks to complete processing stoppedTasks.forEach((partition, task) -> { long offset = task.waitForCompletion(); if (offset > 0) offsetsToCommit.put(partition, new OffsetAndMetadata(offset)); }); // 3. Commit offsets for revoked partitions try { consumer.commitSync(revokedPartitionOffsets); } catch (Exception e) { log.warn("Failed to commit offsets for revoked partitions!"); } }

Conclusion

Implementing a multi-threaded consumer model offers significant advantages over the thread-per-consumer model for certain use cases. The key considerations are always:

  1. Ensure records from the same partitions are processed by only one thread at a time
  2. Commit offsets only after records are processed
  3. Handle group rebalancing properly

While more complex to implement, the multi-threaded approach provides better resource utilization and scalability for high-throughput applications.


Reference: Confluent Blog - Kafka Consumer Multi-Threaded Messaging

Cheers!

Yijie :)