In order to guarantee ordered message delivery with the Kafka messaging broker, messages can be produced with a key. Messages with the same key are written to the same topic partition, and as consumers read messages in order from each partition, their ordering is preserved.
A Spring Boot application that demonstrates writing messages with keys and the impact on ordering accompanies this article. The full source code is available
here
.
Topic Partitions
Each Kafka topic is divided into one or more partitions. This means that when a producer is writing a message to a topic, it will write it to one of the partitions in the topic. Likewise a single consumer in a consumer group that is subscribed to the topic will consume from all the partitions.
Figure 1: Single producer and consumer
In this example, messages are being produced without a message key, so there is no guarantee as to which topic partition they will be written to. Related messages have been colour coded, and we see that messages relating to the red entity have been written to partition
0
and
2
. As a result, the consumer polling for batches of records from the partitions will receive these related messages in an unknown order. In this case, it may receive later red messages written to partition
2
before it receives the first red message written to partition
0
.
Scalability
Topic partitions are a core aspect of the high scalability that the Kafka messaging broker provides. By increasing the number of partitions in a topic in line with the number of consumer instances in the same consumer group that are subscribed to this topic, throughput is increased. Whereas in
figure 1
there is a single consumer reading from all three topic partitions, in
figure 2
there are now multiple consumer instances reading from the topic. The topic partitions are divided up between the consumer instances, so the throughput has increased.
Figure 2: Multiple consumers
This diagram highlights the fact that related messages written across different partitions will be consumed and processed in an unknown order. Messages for the red entity will be consumed by two different consumer instances, those assigned to partition
0
and partition
2
, so it is unknown which messages will be processed first.
In
figure 2
there are four consumers, but with only three topic partitions it means that the fourth consumer instance is idle as it has no free partition to be assigned to. This could be deemed a waste of resources as the consumer instance is still using CPU and memory. On the other hand it means that if one of the other consumer instances fails, the spare consumer would be available on the resulting consumer group rebalance, minimising the impact of the failed consumer.
While one consideration might be to increase the topic partition count to four to mean that all available consumer instances would be assigned partitions, this is not a recommended practice. Topic partition counts should be carefully sized in advance to cater for current and expected data volumes. Increasing the number of partitions once a topic is in Production can result in a loss of order, and the number of partitions can not be reduced, as data would be lost.
Message Keys
In order to guarantee message ordering, related messages that should be ordered must be written to the same topic partition. This is achieved by writing the message with a header key. There is a less common alternative to using a message key which is for the producer itself to stipulate which partition to write a message to.
As with the message body, the key can be typed as required, such as a String, Integer or a JSON object. Likewise, as with the body, the key can have an Avro schema defined. The key is hashed by the producer, and this hash is used to determine which partition to write the message to. If no message with this hashed key has yet been written to a partition, (or indeed no key has been included) then a new partition will be selected, with Kafka typically attempting to spread load evenly across the partitions.
Message Ordering
Returning to the above example, in the following diagram four different keys each representing four different entities have been written to a topic with three partitions.
Figure 3: Producing keyed messages
This time all messages with the same key are written to the same partition. Of course any one partition will still hold messages for different keyed messages, as we see with messages for the red and blue entities both being written to partition 0. Now however when a consumer receives messages from a partition those messages relating to each entity will be in a guaranteed order.
Concurrent Consumers
Spring Kafka makes it straightforward to instantiate multiple instances of an application’s Kafka consumer from within the single application deployment. Each of these consumer instances are treated as separate consumers in the consumer group, so the topic partitions will be assigned between them, as with any other consumers in the same consumer group. They operate concurrently, so if memory and CPU allow this is another good option for increasing throughput.
In the accompanying Spring Boot application an example consumer
KafkaDemoConsumer
consumes messages from the topic
demo-inbound-topic
. The consumer is created using Spring’s
KafkaListenerContainerFactory
, and this Spring bean is defined in
KafkaDemoConfiguration
. The
ConcurrentKafkaListenerContainerFactory
implementation can be configured to create multiple concurrent instances of each consumer it creates. In the example the concurrency has been configured to
3
:
ConcurrentKafkaListenerContainerFactory
The upshot is that when the application is started then three instances of the
KafkaDemoConsumer
are started. The assignment of the instances across the topic partitions can be viewed using the Kafka command line tools, specifically using the
kafka-consumer-groups
tool to describe the consumer group. The consumer group is named
demo-consumer-group
as configured on the
@KafkaListener
annotation on the
KafkaDemoConsumer
.
In this example there are 10 topic partitions for the
demo-inbound-topic
, and a single instance of the demo Spring Boot application has been started. The partitions can be seen to have been assigned across the three concurrent consumer instances within the one consumer group. So long as related messages are written to the topic with the same key they will be produced to the same partition, and hence consumed in order by the assigned consumer instance.
Producer Retries
It is common practice to configure a producer to retry writes that fail due to transient errors, such as network connection failures. By default retries are set to the maximum integer value. However retrying can impact message ordering guarantees, depending on other producer configuration values.
The configuration parameters of interest are the following:
Producer Config Param
Usage
Default
retries
Cause the producer to retry transient write failures
2147483647
enable.idempotence
When
true
, ensures that the message is only ever written once
max.in.flight.requests.per.connection
The maximum unacknowledged requests before the producer blocks
Message ordering is no longer guaranteed if retries occur due to transient errors with the following combinations of configuration settings:
With
enable.idempotence
set to
false
, if the first batch request fails with a transient error and is retried, but a second in-flight batch is written successfully to the same topic partition before the first is successfully completed, the messages will have been written out of order.
With this second combination of configuration parameters where
enable.idempotence
is set to
true
, an attempt to perform a duplicate write to Kafka would on the face of it be stopped. However this guarantee is lost if
max.in.flight.requests.per.connection
is greater than
5
. The limit of
5
here is the highest supported value that means a producer can be idempotent. There is more detail on the idempotent producer in the article
Kafka Idempotent Producer
.
Adding Or Removing Partitions
If the number of partitions for a topic is changed, then ordering guarantees are lost for new messages with a given key compared to existing messages with the same key written prior to the change. The partition selected for a given key will be based upon the current number of partitions. So as that number has increased or decreased, there is no guarantee that new messages with a particular key will be written to the same partition as existing messages with that same key - and hence guaranteed ordering is lost. Of course all new keyed messages written following a change to the number of partitions will be guaranteed to be in order with other new keyed messages, just not existing ones. Meanwhile if the number of partitions is decreased then any messages on the partitions that are removed will be lost.
Spring Boot Demo
Overview
The accompanying Spring Boot application is used to demonstrate the impact of adding a key to a message and the partition it is written to. The application consumes keyed messages from a topic named
demo-inbound-topic
. This results in an outbound keyed message being written to a topic named
demo-outbound-topic
.
Figure 5: Spring Boot demo application
Consumer
As demonstrated in the
KafkaDemoConsumer
, using Spring Kafka the key of the consumed message is passed to the
@KafkaListener
message handler when a method argument is provided with the
KafkaHeader
named
RECEIVED_KEY
.
In order to prove the behaviour, an integration test
KafkaIntegrationTest
using Spring Kafka’s embedded broker is provided. This is configured with 10 partitions per topic, and 10,000 messages are sent to the inbound topic with 5 unique keys. The test includes a test listener that tracks the partitions that each resulting outbound keyed message is received on, along with the message sequence. It then verifies that each message key is only ever received on the same partition, and that each message for a given key has an ascending sequence number, proving that message ordering is preserved.
Similarly a component test
EndToEndCT
that spins up Docker containers for Kafka, Zookeeper, and the application under test is provided. Using a real Kafka instance this time, once again the test verifies that each message key is only ever received on the same partition.
Kafka Command Line Tools
Kafka provides a set up command line tools for interacting with the broker, and these include tools that enable sending and receiving messages to and from Kafka topics. These are demonstrated in the accompanying Spring Boot application. Of particular note here is how to include a message key when sending a message, and how to view a message key when consuming a message on the command line.
The demo application can be run up, first building with maven, then bringing up Kafka and Zookeeper in Docker using the provided
docker-compose.yml
, and finally executing the application jar:
mvn clean install
docker-compose up -d
java -jar target/kafka-message-keys-1.0.0.jar
The Kafka Docker container contains the command line tools, so jump on to the container with:
docker exec -ti kafka bash
The usual command to produce a message with the
kafka-console-producer
is provided two extra properties,
parse.key
, set to
true
, and
key.separator
, set to the character to use as the separator in the provided message, for example a semi-colon.
The message is now entered on the command line, and this includes the key, which in this example is a JSON formatted String with an
id
field set to
"123"
, followed by the semi-colon, and then the JSON payload:
The demo application consumes this message from the
demo-inbound-topic
, and emits a keyed message to the
demo-outbound-topic
which can be consumed using the
kafka-console-consumer
. This command also has two extra properties included to view the key,
print.key
, set to
true
, and
key.separator
. In order to view messages already written to the topic, the
--from-beginning
parameter is also included:
Kafka guarantees message ordering when related messages are written to Kafka with the same key. Messages with the same key are written to the same topic partition. As each partition is assigned to a single consumer instance within a consumer group, and messages are consumed in order from each partition, then ordering is preserved for messages with the same key. However, if producer retries are enabled for transient errors then it is important to verify the associated configuration parameters to ensure the ordering guarantee remains.
Source Code
The source code for the accompanying Spring Boot demo application is available here:
Lydtech's Udemy course
Introduction to Kafka with Spring Boot
covers everything from the core concepts of messaging and Kafka through to step by step code walkthroughs to build a fully functional Spring Boot application that integrates with Kafka. Put together by our team of Kafka and Spring experts, this course is the perfect introduction to using Kafka with Spring Boot.