@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
If a KafkaTransactionManager
bean is defined, it is automatically associated to the container factory.
Similarly, if a RecordFilterStrategy
, CommonErrorHandler
, AfterRollbackProcessor
or ConsumerAwareRebalanceListener
bean is defined, it is automatically associated to the default factory.
Depending on the listener type, a RecordMessageConverter
or BatchMessageConverter
bean is associated to the default factory.
If only a RecordMessageConverter
bean is present for a batch listener, it is wrapped in a BatchMessageConverter
.
Spring for Apache Kafka provides a factory bean to create a StreamsBuilder
object and manage the lifecycle of its streams.
Spring Boot auto-configures the required KafkaStreamsConfiguration
bean as long as kafka-streams
is on the classpath and Kafka Streams is enabled by the @EnableKafkaStreams
annotation.
Enabling Kafka Streams means that the application id and bootstrap servers must be set.
The former can be configured using spring.kafka.streams.application-id
, defaulting to spring.application.name
if not set.
The latter can be set globally or specifically overridden only for streams.
Several additional properties are available using dedicated properties; other arbitrary Kafka properties can be set using the spring.kafka.streams.properties
namespace.
See also Additional Kafka Properties for more information.
To use the factory bean, wire StreamsBuilder
into your @Bean
as shown in the following example:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase());
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
return stream
private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
return KeyValue(key, value.uppercase())
The properties supported by auto configuration are shown in the “Integration Properties” section of the Appendix.
Note that, for the most part, these properties (hyphenated or camelCase) map directly to the Apache Kafka dotted properties.
See the Apache Kafka documentation for details.
Properties that don’t include a client type (producer
, consumer
, admin
, or streams
) in their name are considered to be common and apply to all clients.
Most of these common properties can be overridden for one or more of the client types, if needed.
Apache Kafka designates properties with an importance of HIGH, MEDIUM, or LOW.
Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.
Only a subset of the properties supported by Kafka are available directly through the KafkaProperties
class.
If you wish to configure the individual client types with additional properties that are not directly supported, use the following properties:
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
This sets the common prop.one
Kafka property to first
(applies to producers, consumers, admins, and streams), the prop.two
admin property to second
, the prop.three
consumer property to third
, the prop.four
producer property to fourth
and the prop.five
streams property to fifth
.
You can also configure the Spring Kafka JsonDeserializer
as follows:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
Spring for Apache Kafka provides a convenient way to test projects with an embedded Apache Kafka broker.
To use this feature, annotate a test class with @EmbeddedKafka
from the spring-kafka-test
module.
For more information, please see the Spring for Apache Kafka reference manual.
To make Spring Boot auto-configuration work with the aforementioned embedded Apache Kafka broker, you need to remap a system property for embedded broker addresses (populated by the EmbeddedKafkaBroker
) into the Spring Boot configuration property for Apache Kafka.
There are several ways to do that:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
Apache®, Apache Tomcat®, Apache Kafka®, Apache Cassandra™, and Apache Geode™ are trademarks or registered trademarks of the Apache Software Foundation in the United States and/or other countries. Java™, Java™ SE, Java™ EE, and OpenJDK™ are trademarks of Oracle and/or its affiliates. Kubernetes® is a registered trademark of the Linux Foundation in the United States and other countries. Linux® is the registered trademark of Linus Torvalds in the United States and other countries. Windows® and Microsoft® Azure are registered trademarks of Microsoft Corporation. “AWS” and “Amazon Web Services” are trademarks or registered trademarks of Amazon.com Inc. or its affiliates. All other trademarks and copyrights are property of their respective owners and are only mentioned for informative purposes. Other names may be trademarks of their respective owners.