Introduction
Consuming and producing messages in the JSON format is a popular choice for clients using Kafka. This article demonstrates how to implement Kafka consumers and producers to handle JSON messages. An accompanying Spring Boot application demonstrates a simple consume and produce flow, using the Spring Kafka library to perform the serialization necessary.
The full source code for the demo application is available
here
.
JSON Serialization
Messages are transported and stored in Kafka as byte arrays, but they can be serialized and deserialized to/from different data types. One common format is the String type. String serialization is demonstrated in the article
Kafka Consume & Produce: Spring Boot Demo
. Another popular format is JSON. While a JSON message is also a String, it additionally defines that the String must be in a specified format. An example JSON message looks like this:
"id": "8ed0dc67-41a4-4468-81e1-960340d30c92",
"name": "J.Smith",
"age": 29
Advantages of using JSON include the fact that they are easy to read, follow a specified format, and can be verified via a schema. For example a schema can specify that the message must have properties
id
,
name
, and
age
. A JSON message can therefore be validated against such a schema.
Along with the payload, a message can optionally contain a key. If present, the key also will be serialized and deserialized to a byte array. And as with the message payload, the data type of the key can be JSON.
Serialization With Spring Kafka
The Spring Kafka library provides an abstraction over the serialization stages, that is the consumer deserialization from a byte array, and the producer serialization to a byte array. Spring Kafka takes care of mapping the JSON payload and key to associated plain old Java objects (POJOs) implemented by the developer. Beyond this the developer has little to do beyond configuring the consumer and producer to use JSON serialization, and of course testing the flow.
The configuration can be done one of two ways. Serialization properties can be declared in the application properties file, which are then applied to the default Spring beans that Spring loads into the application context at startup. Alternatively the Spring beans can be defined programmatically, and configured as required. The demo application uses the latter approach, which gives the advantage of providing compile time validation that the types provided are correct and on the application classpath. It can also be clearer to see what beans are being used and how they are configured once more features and complex configurations are being added. Finally it is easier to add variations of factories, for example, when multiple application consumers and producers require different configurations.
Spring Boot Demo
Overview
The Spring Boot application contains a single Kafka consumer and a single Kafka producer. The consumer listens to a topic
demo-inbound-topic
, and consumes messages that are deserialized to JSON. This payload is typed as
DemoInboundPayload
. It also expects a key to be present, and is typed as
DemoInboundKey
. On receipt of the message the application then constructs a JSON
DemoOutboundPayload
, along with a JSON
DemoOutboundKey
which together form the outbound message and are serialized to a byte array. This is emitted to the Kafka topic
demo-outbound-topic
.
Figure 1: Consuming and producing JSON formatted messages
The message is consumed from the
inbound-demo-topic
.
The key and value are deserialized to JSON, which are mapped to internal object representations,
DemoInboundKey
and
DemoInboundPayload
respectively.
The internal object representations of the JSON outbound message,
DemoOutboundKey
and
DemoOutboundPayload
are serialized to a byte array.
The message is written to the
outbound-demo-topic
.
Implementation
Consumer
The consumer,
KafkaDemoConsumer
, is annotated with the Spring Kafka
@KafkaListener
annotation, and the topics to listen to, the consumer group, and the container factory are all specified here. This tells Spring what messages this consumer is interested in, and Spring then takes care of polling the Kafka topic and passing the messages to this method.
@KafkaListener(
topics = "demo-inbound-topic",
groupId = "demo-consumer-group",
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Header(KafkaHeaders.RECEIVED_KEY) DemoInboundKey key, @Payload DemoInboundEvent payload) {
The key and payload are typed,
DemoInboundKey
and
DemoInboundPayload
respectively. These are the POJOs (plain old Java objects) that the developer implements. Spring takes care of the deserialization of the byte array message to JSON for both the key and the payload, and then maps it to these objects, based on the configuration applied to the
KafkaListenerContainerFactory
.
KafkaListenerContainerFactory
The
KafkaListenerContainerFactory
referenced in the
@KafkaListener
annotation is another Spring bean, and this is defined in the application configuration class,
KafkaDemoConfiguration
. The configuration is applied to the
KafkaListenerContainerFactory
by setting on it the
ConsumerFactory
, another Spring bean defined in this class. The
ConsumerFactory
contains a map of key/value properties, including the serialization properties.
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(JsonDeserializer.KEY_DEFAULT_TYPE, DemoInboundKey.class.getCanonicalName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DemoInboundPayload.class.getCanonicalName());
For both the message key and message value (payload), the deserializer class to use is specified, and the default type that the JSON should be mapped to. Notice that the deserializer class is in fact Spring's
ErrorHandlingDeserializer
, and this is told the delegate class to use, the
JsonDeserializer
. This is important as if the error handler is not used here, and a message is consumed that cannot be deserialized to JSON and mapped to the expected type, an exception is thrown. The message is not marked as consumed and is redelivered on the next poll. Again another exception is thrown and the message is redelivered, resulting in a poison pill - the partition is now blocked while this message continually errors. By using the error handler here, Spring handles the exception cleanly. The error is thrown (and the message could for example be dead-lettered), and on the next poll this message is skipped, so the partition is not blocked.
It is also an option to dispense with specifying the default type for the key and value (i.e. in this case the
DemoInboundKey
and
DemoInboundPayload
canonical names), and instead provide this information in message headers. In fact Spring automatically adds this header by default to outbound events, unless it is configured not to with the following configuration in the
ProducerFactory
(more on the
ProducerFactory
below):
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
If the type info header is present then the consumer will use this by default. It can be configured to ignore the type header if present, and rely on the default type instead, with this configuration in the
ConsumerFactory
:
config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
As described in the
JSON Serialization
section above, it is possible to define this configuration in the application properties instead of declaring these beans. For example, declaring the JSON deserializer and error handler would look like this:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
Producer
Once the message has been consumed and processed, an outbound message is sent to the
demo-outbound-topic
. The producer,
KafkaDemoProducer
, has Spring's
KafkaTemplate
bean wired in, and uses this to send the message. Similar to the
@KafkaListener
for the consumer, the
KafkaTemplate
is configured as required for the necessary serialization. There is no explicit serialization coding required by the developer.
The
send()
method is called on the
KafkaTemplate
passing the topic name, key (
DemoOutboundKey
) and payload (
DemoOutboundPayload
).
kafkaTemplate.send(properties.getOutboundTopic(), key, event).get();
Spring now takes care of writing the message to Kafka. Notice the final call to
get()
on the result of the send. Without this, the send is asynchronous. The send does not wait for acknowledgement that the produce was successful, so is considered fire and forget. Instead a
CompletableFuture
is returned, and the call to
get()
on it makes the send synchronous, as it will await completion of the send before continuing. This then allows any exceptions thrown to be handled as required, perhaps via a retry or by dead-lettering the original message.
By default when the
send()
has completed, where
enable.auto.commit
is configured as
true
, Spring will also commit the offsets for the consumed message, marking it as successfully consumed, ensuring it will not be redelivered as a duplicate.
KafkaTemplate Configuration
The
KafkaTemplate
Spring bean is declared in the
KafkaDemoConfiguration
, and it is passed a
ProducerFactory
Spring bean. As with the
ConsumerFactory
, the
ProducerFactory
has the configuration applied containing a Map of key/value properties that include the serialization properties.
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
In this case as the application will handle any serialization errors, there is no error handling serializer. Instead the Spring
JsonSerializer
class is specified for the serialization of the key and value being produced. As with the consumer deserialization configuration, this serialization configuration could be defined in the application properties file rather than via declaring the Spring beans.
Kafka Spring Beans Generic Types
The Kafka Spring beans defined in
KafkaDemoConfiguration
could be strongly typed with the actual key and value types. For example
DemoInboundKey
and
DemoInboundPayload
, for the
ConcurrentKafkaListenerContainerFactory
and
ConsumerFactory
, and
DemoOutboundKey
and
DemoOutboundPayload
for the
KafkaTemplate
and the
ProducerFactory
:
public ConcurrentKafkaListenerContainerFactory<DemoInboundKey, DemoInboundPayload> kafkaListenerContainerFactory(final ConsumerFactory<DemoInboundKey, DemoInboundPayload> consumerFactory) {
public KafkaTemplate<DemoOutboundKey, DemoOutboundEvent> kafkaTemplate(final ProducerFactory<DemoOutboundKey, DemoOutboundEvent> producerFactory) {
However, they can equally be typed with
Object
, or wildcards indicating unknown types:
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(final ConsumerFactory<Object, Object> consumerFactory) {
public KafkaTemplate<Object, Object> kafkaTemplate(final ProducerFactory<Object, Object> producerFactory) {
The advantage here is that multiple factories do not need to be declared even if there are multiple consumers or producers using different types. In fact this means the integration test
KafkaIntegrationTest
can use these same Spring beans and does not need to define its own.
Demo Execution
First of all the Kafka server and Zookeeper (which Kafka depends on in this version) are brought up in docker containers, using the
docker-compose.yml
file. Start up docker locally, and in a terminal in the root of the project, enter:
docker-compose up -d
Build and run the application:
mvn clean install
java -jar target/kafka-json-serialization-1.0.0.jar
In order to send in an event, jump onto the Kafka docker container which has the Kafka command line tools, and use this to produce an event to the
demo-inbound-topic
, which the application will consume:
docker exec -ti kafka bash
kafka-console-producer \
--topic demo-inbound-topic \
--broker-list kafka:29092 \
--property "key.separator=:" \
--property parse.key=true
Of special note here is the syntax of the event that is submitted. The key prefixes the payload, separated by a colon. Looking at the
DemoInboundKey
Java representation shows that the key takes two ids,
primaryId
and
secondaryId
, which are both UUIDs. This is followed by the message payload, with a semi-colon as the separator.
DemoInboundPayload
shows this requires an
id
(another UUID) and a String
inboundData
. For example, submit the following message:
{"primaryId":"f6914736-bbd1-4a4b-b1a7-58fac2d0e4d2", "secondaryId": "c081f385-38e4-4a1d-a93e-5412eeb0121d"};{"id": "168f141e-93ae-427f-9e88-2eda6ac5823d", "inboundData": "my-data"}
This is consumed by the application, and an outbound event is emitted to the
demo-outbound-topic
. To see this, jump onto the kafka container in another terminal window, and start up the console consumer to view the message:
kafka-console-consumer \
--topic demo-outbound-topic \
--bootstrap-server kafka:29092 \
--from-beginning \
--property print.key=true \
--property key.separator=";"
The application maps the inbound key's
primaryId
to the outbound key's
id
field. The payload consists of the
id
from the inbound payload's
id
, and the
inboundData
which is decorated to create the
outboundData
. So the following message is consumed by the console consumer and displayed:
{"id":"f6914736-bbd1-4a4b-b1a7-58fac2d0e4d2"};{"id":"168f141e-93ae-427f-9e88-2eda6ac5823d","outboundData":"Processed: my-data"}
Testing
Unit Tests
As the serialization and deserialization are taken care of by Spring, unit testing the consumer and producer is straightforward, as the tests just use the simple Java types. For the consume, the service that the key and payload are passed is mocked using the
Mockito
mocking framework. The tests for this are defined in
KafkaDemoConsumerTest
. Likewise for the produce, the
KafkaTemplate
can be mocked, to test that the demo producer behaves as expected. These tests are defined in
KafkaDemoProducerTest
.
Integration Tests
The Spring Kafka Test library provides an embedded broker that can be used for the Spring Boot integration tests. This enables messages to be consumed from and produced to a broker using the Kafka APIs, to verify that this is working as expected. It validates therefore that the application is able to start up and connect to the broker successfully. The integration tests also therefore exercise the serialization and deserialization, so prove that this is also correctly configured.
The integration tests for this demo are in
KafkaIntegrationTest
. The class is annotated with Spring Kafka Test's
@EmbeddedKafka
annotation, which is all that is required to enable the embedded broker:
@EmbeddedKafka(controlledShutdown = true, topics = { "demo-inbound-topic", "demo-outbound-topic" })
As the test is responsible for sending in a JSON message to the broker, and then consuming the outbound message from the application, the set of Spring beans must be defined that are configured accordingly, including the consumer and producer factories. Also defined is a test listener class. This records the messages it receives which the tests can then use to assert that the expected messages were emitted by the application. It uses the
Awaitility
testing library to poll the test listener until the received counter has been incremented.
kafkaTemplate.send(DEMO_INBOUND_TEST_TOPIC, inboundKey, inboundPayload).get();
Awaitility.await().atMost(1, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS)
.until(testReceiver.counter::get, equalTo(1));
Component Tests
The next level of testing up from integration tests is component tests. These test the application with a running instance of Kafka. The component test for this demo application uses Lydtech's
component-test-framework
to spin up a dockerised Kafka and Zookeeper, and a dockerised instance of the application under test. It uses the
Testcontainers
testing library under the hood. These tests give full confidence that the application can start and talk to a real Kafka instance. As with unit tests and integration tests they should also be automated.
The component test is
EndToEndCT
. By using the JUnit
@ExtendWith
feature, the test will automatically instruct the
component-test-framework
to spin up, manage and orchestrate the required docker containers:
@ExtendWith(TestContainersSetupExtension.class)
The configuration for the docker containers is defined in the
pom.xml
, in the
component
maven profile section.
<service.name>${project.name}</service.name>
<service.instance.count>1</service.instance.count>
<kafka.enabled>true</kafka.enabled>
Conclusion
As the accompanying Spring Boot application demonstrates, Spring makes it easy for the developer to implement an application that handles JSON messages. Beyond defining the Java types that map to the JSON structures for the key and payload, and applying the correct configuration to the Spring beans responsible for the serialization and deserialization, there is little else required beyond testing the flows. Integration tests using Spring Boot Test and component tests using Testcontainers then provide the tools required to undertake this testing.
Source Code
The source code for the accompanying Spring Boot demo application is available here:
https://github.com/lydtechconsulting/kafka-json-serialization/tree/v1.0.0
Lydtech's Udemy course
Introduction to Kafka with Spring Boot
covers everything from the core concepts of messaging and Kafka through to step by step code walkthroughs to build a fully functional Spring Boot application that integrates with Kafka. Put together by our team of Kafka and Spring experts, this course is the perfect introduction to using Kafka with Spring Boot.