To setup a Kafka cluster, producers and consumers need a deep understanding of how Kafka works and the significance of specific configurations. Since there are already plenty of articles about how Kafka works, this post will look at some important configurations that can be adjusted from their default values to create a fictitious Kafka-based platform.
Highlights
Broker Configurations
Below are some important Kafka Broker configurations:
- compression.type – Compression type for the Topic
- log.flush.interval.messages – Maximum number of messages before flushing
- log.flush.interval.ms – Maximum message-retention time before flushing
- log.retention.bytes – Maximum log size before deletion
- log.retention.ms – Maximum log-retention time before deletion
Topic Configurations
Below are some important Kafka Topic configurations:
- num.partitions – Number of Partitions
- max.message.bytes – Largest allowed batch size of messages
- min.insync.replicas – Minimum number of replicas for acknowledgement
- retention.bytes – Maximum Partition size before deletion
- retention.ms – Maximum log-retention time
Producer Configurations
Below are some important Kafka Producer configurations:
- buffer.memory – Memory used to buffer before sending it to server
- compression.type – Compression Type for the data generated
- retries – Number of retries (in case of error) when sending
- batch.size – Batch size when sending multiple records
- linger.ms – Delay or latency added to increase the chances of batching
- max.request.size – Maximum request size to limit the number of records
Consumer Configurations
Below are some important Kafka Consumer configurations:
- fetch.min.bytes – Minimum amount of data per fetch request
- max.partition.fetch.bytes – Maximum amount of per-partition data to return
- fetch.max.bytes – Maximum amount of data per fetch request
- max.poll.records – Maximum number of records per poll request
Sample Use Case Scenario
Consider a fictitious company called ABC Inc. that has a platform for managing Employee Stock Options and Rewards (call it SORS). SORS system that manages Employee Stock Options data for its clients (e.g., Apple, Bob’s, Acme, etc.). SORS has a daily batch job that processes the clients’ Stock Options data file. As with any other modern enterprise, ABC Inc. has invested heavily to build a modern SORS system to replace its old Batch-based system to a real-time system using Kafka.
Architecture
Above is ABC Inc’s Kafka Broker Cluster with 3 Brokers. SORS messages are produced by the Producer in the left and published in the SORS_Topic in the middle—which is consumed by the Consumer Group ConsumerGroup1 in the right. (By default, creating a Kafka Topic will result in 1 partition).
Number of Partitions
The ideal number of partitions for a Topic should be equal to (or a multiple of) the number of brokers in a cluster. So, when creating a Topic, we must explicit specify that num.partitions=3 partition is needed. This is a 3-broker cluster, so one Partition per broker to distribute the load evenly. The number of partitions can only be increased, not decreased, so extra care must be taken to start with a minimum number that’s as high as possible.
Dedicated Consumer
In ConsumerGroup1 above, you may wonder why there’s a dedicated consumer for Apple. Apple is a prominent customer of ABC Inc. and their message volume is huge when compared to other clients. To process Apple’s messages in a dedicated consumer, the right key should be chosen, so those messages land in their own, Apple-dedicated, partition. Key choice and the partition assignment are beyond the scope of this blog. Please refer to the Kafka Documentation for that.
Throughput
The expected throughput of this consumer application is 20 MB/s and the individual efficiency of the consumer application is limited to 10 MB/s. So, the required throughput of 20 MB/s is achieved by having two consumers in the group taking a load of 10 MB/s each.
Retention
Assuming each Broker has a capacity limit of 3 TB and Producers publish messages at the rate of 20 MB/s—accumulating to 1,728 GB per day. Brokers would need additional memory for other backend processes (e.g., Replication, leader election, log cleaning etc.). The Consumers never lag beyond a few minutes in the Production. Also, messages that could not be processed because of an error are written to another Topic that acts as a Dead Letter Queue. So, the retention strategy is set to 1 day or 2 TB.
Log.retention.bytes is a per-partition value, So, setting a 1 GB value will result in 3 GB for a Topic if there are 3 partitions.
Also, note the background thread to clear out old files will be triggered if any one of log.retention.bytes or log.retention.ms is met first.
Log Directory Recovery Threads
Log.dirs and num.recovery.threads.per.data.dir configuration should be modified together. So, for if log.dirs=2 log directories and num.recovery.threads.per.data.dir=7, then 14 threads will be configured. Rasing the num.recovery.threads.per.data.dir would not count towards the active runtime threads as these threads are used only during the Broker startup. For a quicker broker recovery after a crash, it is imperative to set this to a higher number.
Message Size
Default maximum message size a publisher can publish is 1 MB. This value can be modified using the configuration message.max.bytes. Care must be taken when modifying this value because any config change should be reflected in fetch.message.max.bytes as well as at the consumer side. Otherwise, setting a bigger value for message.max.bytes compared to fetch.message.max.bytes will result in bigger messages not getting fetched by the consumer.
Broker Size
For a cluster to retain 9 TB data with an individual broker capacity of 3 TB, we need at least 3 Brokers. We do not need just 3 * 3 = 9 TB of data, because the replication will occupy space by 100% and number of replication factor and (acks=all) will have a huge impact on the total capacity requirements.
The broker itself runs in a JVM. A 5 GB heap is sufficient for this JVM. However, the JVM process uses page cache extensively in buffering the messages when writing to the disk. So, care must be taken not to host other applications that might share the page cache used by the Kafka’s JVM.
Partition Allocation
In an ideal scenario, computing the broker size based on the previous section may look right. However, in the case of a single broker crash, the remaining two brokers will have to host the other Broker’s partition, as well as the partition replication it had. In the worst case, if two brokers went down, all the partitions and its replications will be in one broker. Per partition per disk is the ideal approach. Rack awareness must be considered when allocating partitions, so the replica partition and the partition itself do not reside in the same rack.
Conclusion
I hope this post helped you design and scale a minimum viable Kafka application, based on the sample scenario provided. If your organization has any additional questions about setting up a setup a Kafka cluster, please don’t hesitate to reach out to us. We’d love to help you get started.