添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Introduction

Consuming and producing messages in the JSON format is a popular choice for clients using Kafka. This article demonstrates how to implement Kafka consumers and producers to handle JSON messages. An accompanying Spring Boot application demonstrates a simple consume and produce flow, using the Spring Kafka library to perform the serialization necessary.

The full source code for the demo application is available here .

JSON Serialization

Messages are transported and stored in Kafka as byte arrays, but they can be serialized and deserialized to/from different data types. One common format is the String type. String serialization is demonstrated in the article Kafka Consume & Produce: Spring Boot Demo . Another popular format is JSON. While a JSON message is also a String, it additionally defines that the String must be in a specified format. An example JSON message looks like this:

"id": "8ed0dc67-41a4-4468-81e1-960340d30c92", "name": "J.Smith", "age": 29

Advantages of using JSON include the fact that they are easy to read, follow a specified format, and can be verified via a schema. For example a schema can specify that the message must have properties id , name , and age . A JSON message can therefore be validated against such a schema.

Along with the payload, a message can optionally contain a key. If present, the key also will be serialized and deserialized to a byte array. And as with the message payload, the data type of the key can be JSON.

Serialization With Spring Kafka

The Spring Kafka library provides an abstraction over the serialization stages, that is the consumer deserialization from a byte array, and the producer serialization to a byte array. Spring Kafka takes care of mapping the JSON payload and key to associated plain old Java objects (POJOs) implemented by the developer. Beyond this the developer has little to do beyond configuring the consumer and producer to use JSON serialization, and of course testing the flow.

The configuration can be done one of two ways. Serialization properties can be declared in the application properties file, which are then applied to the default Spring beans that Spring loads into the application context at startup. Alternatively the Spring beans can be defined programmatically, and configured as required. The demo application uses the latter approach, which gives the advantage of providing compile time validation that the types provided are correct and on the application classpath. It can also be clearer to see what beans are being used and how they are configured once more features and complex configurations are being added. Finally it is easier to add variations of factories, for example, when multiple application consumers and producers require different configurations.

Spring Boot Demo

Overview

The Spring Boot application contains a single Kafka consumer and a single Kafka producer. The consumer listens to a topic demo-inbound-topic , and consumes messages that are deserialized to JSON. This payload is typed as DemoInboundPayload . It also expects a key to be present, and is typed as DemoInboundKey . On receipt of the message the application then constructs a JSON DemoOutboundPayload , along with a JSON DemoOutboundKey which together form the outbound message and are serialized to a byte array. This is emitted to the Kafka topic demo-outbound-topic .

Figure 1: Consuming and producing JSON formatted messages

Step 1 The message is consumed from the inbound-demo-topic .

Step 2 The key and value are deserialized to JSON, which are mapped to internal object representations, DemoInboundKey and DemoInboundPayload respectively.

Step 3 The internal object representations of the JSON outbound message, DemoOutboundKey and DemoOutboundPayload are serialized to a byte array.

Step 4 The message is written to the outbound-demo-topic .

Implementation

Consumer

The consumer, KafkaDemoConsumer , is annotated with the Spring Kafka @KafkaListener annotation, and the topics to listen to, the consumer group, and the container factory are all specified here. This tells Spring what messages this consumer is interested in, and Spring then takes care of polling the Kafka topic and passing the messages to this method.

@KafkaListener(
       topics = "demo-inbound-topic",
       groupId = "demo-consumer-group",
       containerFactory = "kafkaListenerContainerFactory")
public void listen(@Header(KafkaHeaders.RECEIVED_KEY) DemoInboundKey key, @Payload DemoInboundEvent payload) {

The key and payload are typed, DemoInboundKey and DemoInboundPayload respectively. These are the POJOs (plain old Java objects) that the developer implements. Spring takes care of the deserialization of the byte array message to JSON for both the key and the payload, and then maps it to these objects, based on the configuration applied to the KafkaListenerContainerFactory .

KafkaListenerContainerFactory

The KafkaListenerContainerFactory referenced in the @KafkaListener annotation is another Spring bean, and this is defined in the application configuration class, KafkaDemoConfiguration . The configuration is applied to the KafkaListenerContainerFactory by setting on it the ConsumerFactory , another Spring bean defined in this class. The ConsumerFactory contains a map of key/value properties, including the serialization properties.

config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(JsonDeserializer.KEY_DEFAULT_TYPE, DemoInboundKey.class.getCanonicalName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DemoInboundPayload.class.getCanonicalName());

For both the message key and message value (payload), the deserializer class to use is specified, and the default type that the JSON should be mapped to. Notice that the deserializer class is in fact Spring's ErrorHandlingDeserializer , and this is told the delegate class to use, the JsonDeserializer . This is important as if the error handler is not used here, and a message is consumed that cannot be deserialized to JSON and mapped to the expected type, an exception is thrown. The message is not marked as consumed and is redelivered on the next poll. Again another exception is thrown and the message is redelivered, resulting in a poison pill - the partition is now blocked while this message continually errors. By using the error handler here, Spring handles the exception cleanly. The error is thrown (and the message could for example be dead-lettered), and on the next poll this message is skipped, so the partition is not blocked.

It is also an option to dispense with specifying the default type for the key and value (i.e. in this case the DemoInboundKey and DemoInboundPayload canonical names), and instead provide this information in message headers. In fact Spring automatically adds this header by default to outbound events, unless it is configured not to with the following configuration in the ProducerFactory (more on the ProducerFactory below):

config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

If the type info header is present then the consumer will use this by default. It can be configured to ignore the type header if present, and rely on the default type instead, with this configuration in the ConsumerFactory :

config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);

As described in the JSON Serialization section above, it is possible to define this configuration in the application properties instead of declaring these beans. For example, declaring the JSON deserializer and error handler would look like this:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

Producer

Once the message has been consumed and processed, an outbound message is sent to the demo-outbound-topic . The producer, KafkaDemoProducer , has Spring's KafkaTemplate bean wired in, and uses this to send the message. Similar to the @KafkaListener for the consumer, the KafkaTemplate is configured as required for the necessary serialization. There is no explicit serialization coding required by the developer.

The send() method is called on the KafkaTemplate passing the topic name, key ( DemoOutboundKey ) and payload ( DemoOutboundPayload ).

kafkaTemplate.send(properties.getOutboundTopic(), key, event).get();

Spring now takes care of writing the message to Kafka. Notice the final call to get() on the result of the send. Without this, the send is asynchronous. The send does not wait for acknowledgement that the produce was successful, so is considered fire and forget. Instead a CompletableFuture is returned, and the call to get() on it makes the send synchronous, as it will await completion of the send before continuing. This then allows any exceptions thrown to be handled as required, perhaps via a retry or by dead-lettering the original message.

By default when the send() has completed, where enable.auto.commit is configured as true , Spring will also commit the offsets for the consumed message, marking it as successfully consumed, ensuring it will not be redelivered as a duplicate.

KafkaTemplate Configuration

The KafkaTemplate Spring bean is declared in the KafkaDemoConfiguration , and it is passed a ProducerFactory Spring bean. As with the ConsumerFactory , the ProducerFactory has the configuration applied containing a Map of key/value properties that include the serialization properties.

config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

In this case as the application will handle any serialization errors, there is no error handling serializer. Instead the Spring JsonSerializer class is specified for the serialization of the key and value being produced. As with the consumer deserialization configuration, this serialization configuration could be defined in the application properties file rather than via declaring the Spring beans.

Kafka Spring Beans Generic Types

The Kafka Spring beans defined in KafkaDemoConfiguration could be strongly typed with the actual key and value types. For example DemoInboundKey and DemoInboundPayload , for the ConcurrentKafkaListenerContainerFactory and ConsumerFactory , and DemoOutboundKey and DemoOutboundPayload for the KafkaTemplate and the ProducerFactory :

public ConcurrentKafkaListenerContainerFactory<DemoInboundKey, DemoInboundPayload> kafkaListenerContainerFactory(final ConsumerFactory<DemoInboundKey, DemoInboundPayload> consumerFactory) {
public KafkaTemplate<DemoOutboundKey, DemoOutboundEvent> kafkaTemplate(final ProducerFactory<DemoOutboundKey, DemoOutboundEvent> producerFactory) {

However, they can equally be typed with Object , or wildcards indicating unknown types:

public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(final ConsumerFactory<Object, Object> consumerFactory) {
public KafkaTemplate<Object, Object> kafkaTemplate(final ProducerFactory<Object, Object> producerFactory) {

The advantage here is that multiple factories do not need to be declared even if there are multiple consumers or producers using different types. In fact this means the integration test KafkaIntegrationTest can use these same Spring beans and does not need to define its own.

Demo Execution

First of all the Kafka server and Zookeeper (which Kafka depends on in this version) are brought up in docker containers, using the docker-compose.yml file. Start up docker locally, and in a terminal in the root of the project, enter:

docker-compose up -d

Build and run the application:

mvn clean install
java -jar target/kafka-json-serialization-1.0.0.jar

In order to send in an event, jump onto the Kafka docker container which has the Kafka command line tools, and use this to produce an event to the demo-inbound-topic , which the application will consume:

docker exec -ti kafka bash
kafka-console-producer \
--topic demo-inbound-topic \
--broker-list kafka:29092 \
--property "key.separator=:" \
--property parse.key=true

Of special note here is the syntax of the event that is submitted. The key prefixes the payload, separated by a colon. Looking at the DemoInboundKey Java representation shows that the key takes two ids, primaryId and secondaryId , which are both UUIDs. This is followed by the message payload, with a semi-colon as the separator. DemoInboundPayload shows this requires an id (another UUID) and a String inboundData . For example, submit the following message:

{"primaryId":"f6914736-bbd1-4a4b-b1a7-58fac2d0e4d2", "secondaryId": "c081f385-38e4-4a1d-a93e-5412eeb0121d"};{"id": "168f141e-93ae-427f-9e88-2eda6ac5823d", "inboundData": "my-data"}

This is consumed by the application, and an outbound event is emitted to the demo-outbound-topic . To see this, jump onto the kafka container in another terminal window, and start up the console consumer to view the message:

kafka-console-consumer \
--topic demo-outbound-topic \
--bootstrap-server kafka:29092 \
--from-beginning \
--property print.key=true \
--property key.separator=";"

The application maps the inbound key's primaryId to the outbound key's id field. The payload consists of the id from the inbound payload's id , and the inboundData which is decorated to create the outboundData . So the following message is consumed by the console consumer and displayed:

{"id":"f6914736-bbd1-4a4b-b1a7-58fac2d0e4d2"};{"id":"168f141e-93ae-427f-9e88-2eda6ac5823d","outboundData":"Processed: my-data"}

Testing

Unit Tests

As the serialization and deserialization are taken care of by Spring, unit testing the consumer and producer is straightforward, as the tests just use the simple Java types. For the consume, the service that the key and payload are passed is mocked using the Mockito mocking framework. The tests for this are defined in KafkaDemoConsumerTest . Likewise for the produce, the KafkaTemplate can be mocked, to test that the demo producer behaves as expected. These tests are defined in KafkaDemoProducerTest .

Integration Tests

The Spring Kafka Test library provides an embedded broker that can be used for the Spring Boot integration tests. This enables messages to be consumed from and produced to a broker using the Kafka APIs, to verify that this is working as expected. It validates therefore that the application is able to start up and connect to the broker successfully. The integration tests also therefore exercise the serialization and deserialization, so prove that this is also correctly configured.

The integration tests for this demo are in KafkaIntegrationTest . The class is annotated with Spring Kafka Test's @EmbeddedKafka annotation, which is all that is required to enable the embedded broker:

@EmbeddedKafka(controlledShutdown = true, topics = { "demo-inbound-topic", "demo-outbound-topic" })

As the test is responsible for sending in a JSON message to the broker, and then consuming the outbound message from the application, the set of Spring beans must be defined that are configured accordingly, including the consumer and producer factories. Also defined is a test listener class. This records the messages it receives which the tests can then use to assert that the expected messages were emitted by the application. It uses the Awaitility testing library to poll the test listener until the received counter has been incremented.

kafkaTemplate.send(DEMO_INBOUND_TEST_TOPIC, inboundKey, inboundPayload).get();
Awaitility.await().atMost(1, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS)
       .until(testReceiver.counter::get, equalTo(1));

Component Tests

The next level of testing up from integration tests is component tests. These test the application with a running instance of Kafka. The component test for this demo application uses Lydtech's component-test-framework to spin up a dockerised Kafka and Zookeeper, and a dockerised instance of the application under test. It uses the Testcontainers testing library under the hood. These tests give full confidence that the application can start and talk to a real Kafka instance. As with unit tests and integration tests they should also be automated.

The component test is EndToEndCT . By using the JUnit @ExtendWith feature, the test will automatically instruct the component-test-framework to spin up, manage and orchestrate the required docker containers:

@ExtendWith(TestContainersSetupExtension.class)

The configuration for the docker containers is defined in the pom.xml , in the component maven profile section.

<service.name>${project.name}</service.name>
<service.instance.count>1</service.instance.count>
<kafka.enabled>true</kafka.enabled>

Conclusion

As the accompanying Spring Boot application demonstrates, Spring makes it easy for the developer to implement an application that handles JSON messages. Beyond defining the Java types that map to the JSON structures for the key and payload, and applying the correct configuration to the Spring beans responsible for the serialization and deserialization, there is little else required beyond testing the flows. Integration tests using Spring Boot Test and component tests using Testcontainers then provide the tools required to undertake this testing.

Source Code

The source code for the accompanying Spring Boot demo application is available here:

https://github.com/lydtechconsulting/kafka-json-serialization/tree/v1.0.0

Lydtech's Udemy course Introduction to Kafka with Spring Boot covers everything from the core concepts of messaging and Kafka through to step by step code walkthroughs to build a fully functional Spring Boot application that integrates with Kafka. Put together by our team of Kafka and Spring experts, this course is the perfect introduction to using Kafka with Spring Boot.