本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!
本篇文章主要介绍Spring Kafka的常用配置、主题自动创建、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka做测试配置相关内容,最后通过两种方式去实现消息的发布和订阅功能,其中一种是基于
Spring Integration
方式。本文内容基于
Spring Kafka2.3.3文档
及
Spring Boot Kafka相关文档
,Spring创建了一个名为
Spring kafka
的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka,
Spring-Kafka
项目提供了Apache Kafka自动化配置,通过Spring Boot的简化配置(以
spring.kafka.*
作为前缀的配置参数),在Spring Boot中使用Kafka特别简单。并且Spring Boot还提供了一个嵌入式Kafka代理方便做测试。
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
实现下面的所涉及到的功能实现,需要有如下环境:
Java运行或开发环境(JRE/JDK)
Kafka安装成功
更多的配置可以参考《Kafka,ZK集群开发或部署环境搭建及实验》
这一篇文章。
本文尽量做到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,通过Spring Boot中的Kafka几大注解实现发布订阅功能,同时通过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文通过自己实验和整理了较久的时间,涵盖了Spring Kafka大部分内容,希望大家耐心读下来,有什么问题随时反馈,一起学习。
2 Spring Kafka功能概览
Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性如下(截至2019年12月9日):
Spring for Apache Kafka
Spring Integration for Apache Kafka Version
kafka-clients
如使用@EnableKafka
可以监听AbstractListenerContainerFactory
子类目标端点,如ConcurrentKafkaListenerContainerFactory
是AbstractKafkaListenerContainerFactory
的子类。
public class ConcurrentKafkaListenerContainerFactory<K,V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration
@EnableKafka
public class AppConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
return factory;
// other @Bean definitions
@EnableKafka
并不是在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,因此不需要使用显式的@EnableKafka
。如果想要自己实现Kafka配置类,则需要加上@EnableKafka
,如果你不想要Kafka自动配置,比如测试中,需要做的只是移除KafkaAutoConfiguration
:
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
2.1 自动创建主题
💡 要在应用启动时就创建主题,可以添加NewTopic
类型的Bean。如果该主题已经存在,则忽略Bean。
2.2 发送消息
Spring的KafkaTemplate
是自动配置的,你可以直接在自己的Bean中自动连接它,如下例所示:
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
// ...
KafkaTemplate
包装了一个生产者,并提供了向kafka主题发送数据的方便方法。提供异步和同步(发送阻塞)方法,异步(发送非阻塞)方法返回ListenableFuture
,以此监听异步发送状态,成功还是失败,KafkaTemplate提供如下接口:
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
sendDefault
API 要求已向模板提供默认主题。部分API接受一个时间戳作为参数,并将该时间戳存储在记录中,如何存储用户提供的时间戳取决于Kafka主题上配置的时间戳类型,如果主题配置为使用CREATE_TIME
,则记录用户指定的时间戳(如果未指定则生成)。如果将主题配置为使用LOG_APPEND_TIME
,则忽略用户指定的时间戳,并且代理将添加本地代理时间。metrics
和 partitionsFor
方法委托给底层Producer上的相同方法。execute方法提供对底层生产者的直接访问
要使用模板,可以配置一个生产者工厂并在模板的构造函数中提供它。下面的示例演示了如何执行此操作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
// KafkaTemplate构造函数中输入生产者工厂配置
return new KafkaTemplate<Integer, String>(producerFactory());
然后,要使用模板,可以调用其方法之一发送消息。
当你使用包含Message<?>
参数的方法时,主题、分区和键信息在消息头中提供,有如下子项:
KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP
如访问头部信息中某一项信息:
public void handleMessage(Message<?> message) throws MessagingException {
LOGGER.debug("===Received Msg Topic: {}", message.getHeaders().get(KafkaHeaders.TOPIC));
可选的功能是,可以使用ProducerListener
配置KafkaTemplate
,以获得带有发送结果(成功或失败)的异步回调,而不是等待将来完成。以下列表显示了ProducerListener
接口的定义:
public interface ProducerListener<K, V> {
void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
void onError(String topic, Integer partition, K key, V value, Exception exception);
boolean isInterestedInSuccess();
默认情况下,模板配置有LoggingProducerListener
,它只记录错误,在发送成功时不执行任何操作。只有当isInterestedInSuccess
返回true时才调用onSuccess
。 为了方便起见,如果你只想实现其中一个方法,那么将提供抽象ProducerListenerAdapter
。对于isInterestedInSuccess
,它返回false。下面演示了异步结果回调:
public void sendMessage(String msg) {
LOGGER.info("===Producing message[{}]: {}", mTopic, msg);
ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.info("===Producing message success");
@Override
public void onFailure(Throwable ex) {
LOGGER.info("===Producing message failed");
如果希望阻止式发送线程等待结果,可以调用future
的get()
方法。你可能希望在等待之前调用flush()
,或者为了方便起见,模板有一个带有autoFlush
参数的构造函数,该构造函数在每次发送时都会导致模板flush()
。不过,请注意,刷新可能会显著降低性能:
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
使用DefaultKafkaProducerFactory:
如上面使用KafkaTemplate
中所示,ProducerFactory
用于创建生产者。默认情况下,当不使用事务时,DefaultKafkaProducerFactory
会创建一个供所有客户机使用的单例生产者,如KafkaProducer
javadocs中所建议的那样。但是,如果对模板调用flush(),这可能会导致使用同一个生产者的其他线程延迟。从2.3版开始,DefaultKafkaProducerFactory
有一个新属性producerPerThread
。当设置为true
时,工厂将为每个线程创建(和缓存)一个单独的生产者,以避免此问题。
当producerPerThread
为true时,当不再需要生产者时,用户代码必须在工厂上调用closeThreadBoundProducer()
。这将实际关闭生产者并将其从ThreadLocal
中移除。调用reset()或destroy()不会清理这些生产者。
创建DefaultKafkaProducerFactory
时,可以通过调用只接受属性映射的构造函数(请参阅使用KafkaTemplate中的示例)从配置中获取键和/或值序列化器类,或者序列化程序实例可以传递给DefaultKafkaProducerFactory
构造函数(在这种情况下,所有生产者共享相同的实例)。或者,可以提供Supplier<Serializer> s
(从版本2.3开始),用于为每个生产者获取单独的Serializer
实例:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
使用ReplyingKafkaTemplate:
版本2.1.3
引入了KafkaTemplate
的一个子类来提供请求/应答语义。这个类名为ReplyingKafkaTemplate
,并且有一个方法(除了超类中的那些方法之外)。下面的列表显示了方法签名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
结果是一个ListenableFuture
,它被结果异步填充(或者超时时出现异常)。结果还有一个sendFuture
属性,这是调用KafkaTemplate.send()
的结果。你可以使用此Future确定发送操作的结果。这里就不展开了。
2.3 接收消息
可以通过配置MessageListenerContainer
并提供消息监听器或使用@KafkaListener
注解来接收消息。
2.3.1 消息监听器
使用消息监听器容器(message listener container)时,必须提供监听器才能接收数据。目前有八个消息监听器支持的接口。下面的列表显示了这些接口:
// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 作接收的单个ConsumerRecord实例
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。提供对使用者对象的访问。
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。提供对使用者对象的访问。
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
上述消费者对象不是线程安全的。只能在调用侦听器的线程上调用其方法。
2.3.1.1 消息监听器容器
提供了两个MessageListenerContainer
的实现:
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
从单个线程上的所有主题或分区接收所有消息(即一个分区只能分配到一个消费者,一个消费者可以被分配多个分区)。ConcurrentMessageListenerContainer
委托给一个或多个KafkaMessageListenerContainer
实例,以提供多线程使用,从多线程上去处理主题或分区的所有消息。
从Spring Kafka2.2.7版开始,你可以将RecordInterceptor
添加到侦听器容器中;在调用侦听器以允许检查或修改记录之前,将调用它。如果拦截器返回null,则不调用侦听器。侦听器是批处理侦听器时不调用侦听器。从2.3版开始,CompositeRecordInterceptor
可用于调用多个拦截器。
默认情况下,使用事务时,侦听器在事务启动后调用。从2.3.4版开始,你可以设置侦听器容器的interceptBeforeTx
属性,以便在事务启动之前调用侦听器。没有为批处理侦听器提供侦听器,因为Kafka已经提供了ConsumerInterceptor
。
2.3.1.2 使用KafkaMessageListenerContainer
有如下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties,
TopicPartitionOffset... topicPartitions)
每个都获取一个ConsumerFactory
以及有关主题和分区的信息,以及ContainerProperties
对象中的其他配置。ConcurrentMessageListenerContainer
(稍后介绍)使用第二个构造函数跨使用者实例分发TopicPartitionOffset
。ContainerProperties
具有以下构造函数:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数接受一个TopicPartitionOffset
参数数组来显式地指示容器要使用哪些分区(使用消费者的 assign()方法)和可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值是相对于分区内的当前最后偏移量。提供了TopicPartitionOffset
的构造函数,该构造函数接受一个附加的布尔参数。如果是true,则初始偏移(正偏移或负偏移)相对于该消耗器的当前位置。容器启动时应用偏移量。第二个是主题数组,Kafka基于group.id
属性:在组中分布分区来分配分区。第三个使用regex表达式来选择主题。
要将MessageListener
分配给容器,可以在创建容器时使用ContainerProps.setMessageListener
方法。下面的示例演示了如何执行此操作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
注意当创建一个Defaultkafkafkaconsumerfactory
时,使用构造器,该构造器仅以其特性为基础,就意味着从配置中获取了key/value的Deserializer类别。或者,反序列化程序实例可以传递给key/value的DefaultKafkaConsumerFactory
构造函数,在这种情况下,所有消费者共享相同的实例。另一个选项是提供Supplier<Deserializer>s
(从版本2.3开始),用于为每个使用者获取单独的反序列化程序实例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有关可以设置的各种属性的更多信息,请参阅Javadoc 中ContainerProperties
。
从版本Spring Kafka 2.1.1开始,一个名为logContainerConfig
的新属性就可用了。当启用true和INFO日志记录时,每个侦听器容器都会写入一条日志消息,总结其配置属性。
例如,要将日志级别更改为INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)
。
从版本Spring Kafka 2.2开始,添加了名为missingtopicsfailal
的新容器属性(默认值:true)。如果代理上不存在任何客户端发布或订阅涉及到的主题,这将阻止容器启动。如果容器配置为侦听主题模式(regex),则不适用。以前,容器线程在consumer.poll()
方法中循环,等待在记录许多消息时出现主题。除了日志,没有迹象表明有问题。要恢复以前的行为,可以将属性设置为false,这个时候,Broker设置项allow.auto.create.topics=true,且这个容器属性为false,则会自动创建不存在的topic。
2.3.1.3 使用 ConcurrentMessageListenerContainer
单个构造函数类似于第一个KafkaListenerContainer
构造函数。下面的列表显示了构造函数的签名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还有一个并发属性。例如,container.setConcurrency(3)
即表示创建三个KafkaMessageListenerContainer
实例。对于第一个构造函数,Kafka使用它的组管理功能将分区分布到消费者之间。
当监听多个主题时,默认的分区分布可能不是你期望的那样。例如,如果你有三个主题,每个主题有五个分区,并且希望使用concurrency=15
,那么你只看到五个活动的消费者,每个消费者从每个主题中分配一个分区,其他十个消费者处于空闲状态。这是因为默认的Kafka PartitionAssignor
是RangeAssignor
(参见其Javadoc)。对于这种情况,你可能需要考虑改用RoundRobinAssignor
,它将分区分布到所有使用者。然后,为每个使用者分配一个主题或分区。若要更改PartitionAssignor
,你可以在提供给DefaultKafkaConsumerFactory
的属性中设置partition.assignment.strategy
消费者配置参数(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)。
使用Spring Boot时,可以按如下方式分配设置策略:
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
对于第二个构造函数,ConcurrentMessageListenerContainer
将TopicPartition
实例分布在委托KafkaMessageListenerContainer
实例上。
例如,如果提供了六个TopicPartition
实例,并发性为3;每个容器得到两个分区。对于五个TopicPartition
实例,两个容器得到两个分区,第三个容器得到一个分区。如果并发性大于TopicPartitions
的数量,则会向下调整并发性,以便每个容器获得一个分区。调整分区的方式可以使用命令行工具kafka-topics.sh
查询和调整主题上的分区数。还可以添加一个NewTopic
Bean,如果NewTopic设定的数目大于当前数目,spring boot的自动配置的KafkaAdmin
将向上调整分区。
client.id属性(如果已设置)将附加-n
,其中n是对应于并发的消费者实例。当启用JMX时,这是为MBeans提供唯一名称所必需的。
从版本Spring Kafka 1.3开始,MessageListenerContainer
提供了对底层KafkaConsumer
的度量的访问。对于ConcurrentMessageListenerContainer
,metrics()
方法返回所有目标KafkaMessageListenerContainer
实例的度量(metrics)。根据为底层KafkaConsumer
提供的client-id
度量被分组到Map<MetricName, ?extends Metric>
。
从2.3版开始,ContainerProperties
提供了一个idleBetweenPolls
选项,允许侦听器容器中的主循环在KafkaConsumer.poll()
调用之间睡眠。从提供的选项中选择实际睡眠间隔作为最小值,并且选择max.poll.interval.ms
消费者配置和当前记录批处理时间之间的差异。
2.3.1.4 提交偏移量
提供了几个提交偏移量的选项。如果enable.auto.commit
使用者属性为true
,则Kafka将根据其配置自动提交偏移量。如果为false
,则容器支持多个AckMode
设置(在下一个列表中描述)。默认的确认模式是批处理。从2.3版开始,框架将enable.auto.commit
设置为false
,除非在配置中显式设置。以前,如果未设置属性,则使用Kafka默认值(true)。消费者 poll()
方法返回一个或多个ConsumerRecords
。为每个记录调用MessageListener
。以下列表描述了容器对每个AckMode
采取的操作:
RECORD: 当侦听器在处理记录后返回时提交偏移量。
BATCH: 处理完poll()
返回的所有记录后提交偏移量。
TIME: 在处理完poll()
返回的所有记录后提交偏移量,只要超过上次提交后的ackTime
COUNT: 在处理完poll()
返回的所有记录后提交偏移量,只要上次提交后收到ackCount
记录。
COUNT_TIME: 类似于TIME
和COUNT
,但如果两个条件都为true,则执行提交。
MANUAL: 消息侦听器负责acknowledge()
和Acknowledgment
。之后,应用与BATCH相同的语义。
MANUAL_IMMEDIATE: 侦听器调用Acknowledgement.acknowledge()
方法时立即提交偏移量。
MANUAL和MANUAL_IMMEDIATE 要求侦听器是AcknowledgingMessageListener
或BatchAcknowledgingMessageListener
。请参见消息侦听器。
根据syncCommits
容器属性,使用消费者上的commitSync()
或commitAsync()
方法。默认情况下,syncCommits
为true;另请参阅setSyncCommitTimeout
。请参阅setCommitCallback
以获取异步提交的结果;默认回调是LoggingCommitCallback
,它记录错误(以及调试级别的成功)。
因为侦听器容器有自己的提交偏移的机制,所以它希望Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
为false。从2.3版开始,除非在使用者工厂或容器的使用者属性重写中特别设置,否则它将无条件地将其设置为false。
Acknowledgment
有以下方法:
public interface Acknowledgment {
void acknowledge();
此方法使侦听器可以控制何时提交偏移。
从版本2.3开始,确认接口有两个附加方法nack(long sleep)
和nack(int index, long sleep)
。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引发IllegalStateException
。
nack()只能在调用侦听器的消费者线程上调用。
使用批处理侦听器时,可以在发生故障的批内指定索引。调用nack()
时,将在对失败和丢弃的记录的分区执行索引和查找之前提交记录的偏移量,以便在下次poll()
时重新传递这些偏移量。这是对SeekToCurrentBatchErrorHandler
的改进,SeekToCurrentBatchErrorHandler
只能查找整个批次以便重新交付。
注意:通过组管理使用分区分配时,确保sleep参数(加上处理上一次轮询记录所花费的时间)小于consumer max.poll.interval.ms
属性非常重要。
2.3.1.5 侦听器容器自动启动和手动启动
侦听器容器实现了SmartLifecycle
(通过SmartLifecycle
在Spring加载和初始化所有bean后,接着执行一些任务或者启动需要的异步服务),默认情况下autoStartup
为true
。容器在后期启动(Integer.MAX-VALUE - 100
)。实现SmartLifecycle
以处理来自侦听器的数据的其他组件应该在较早的阶段启动。-100
为以后的阶段留出了空间,使组件能够在容器之后自动启动。比如我们通过@Bean
将监听器容器交给Spring管理,这个时候通过SmartLifecycle
自动执行了初始化的任务,但是当我们手动通过new监听器容器实例,则后初始化则不会执行,比如KafkaMessageListenerContainer
实例需要手动执行start()
。
autoStartup
在手动执行start中设置true与false没有作用,可以参见@KafkaListener
声明周期管理这一小节。
2.3.2 @KafkaListener注解
2.3.2.1 Record Listeners
@KafkaListener
注解用于将bean方法指定为侦听器容器的侦听器。bean包装在一个MessagingMessageListenerAdapter
中,该适配器配置有各种功能,如转换器,用于转换数据(如有必要)以匹配方法参数。通过使用属性占位符(${…}
),或者可以使用SpEL(#{…}
)配置注释上的大多数属性。有关更多信息,请参阅Javadoc。
@KafkaListener
:
id
:listener唯一id,当GroupId没有被配置的时候,默认id为自动产生,此值指定后会覆盖group id。
containerFactory
:上面提到了@KafkaListener区分单数据还是多数据消费只需要配置一下注解的containerFactory属性就可以了,这里面配置的是监听容器工厂,也就是ConcurrentKafkaListenerContainerFactory
,配置Bean名称
topics
:需要监听的Topic,可监听多个,可以是表达式或者占位符关键字或者直接是主题名称,如多个主题监听:{"topic1" , "topic2"}
topicPattern
: 此侦听器的主题模式。条目可以是“主题模式”、“属性占位符键”或“表达式”。框架将创建一个容器,该容器订阅与指定模式匹配的所有主题,以获取动态分配的分区。模式匹配将针对检查时存在的主题周期性地执行。表达式必须解析为主题模式(支持字符串或模式结果类型)。这使用组管理,Kafka将为组成员分配分区。
topicPartitions
:用于使用手动主题/分区分配时
errorHandler
:监听异常处理器,配置Bean名称,默认为空
groupId
:消费组ID
idIsGroup
:id是否为GroupId
clientIdPrefix
:消费者Id前缀
beanRef
:真实监听容器的Bean名称,需要在 Bean名称前加 "__"
@KafkaListener
注解为简单的POJO侦听器提供了一种机制。下面的示例演示如何使用它:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
此机制生效需要@Configuration
类之一上的@EnableKafka
注解和用于配置基础ConcurrentMessageListenerContainer
的侦听器容器工厂。默认情况下,需要名为kafkaListenerContainerFactory
的bean。以下示例演示如何使用ConcurrentMessageListenerContain
:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
return props;
注意,要设置容器属性,必须在工厂上使用getContainerProperties()
方法。它用作注入容器的实际属性的模板。
从版本2.1.1开始,现在可以为注解创建的消费者设置client.id
属性。clientdprefix
的后缀是-n
,其中n是一个整数,表示使用并发时的容器号。
从2.2版开始,现在可以通过使用批注本身的属性来重写容器工厂的并发性和自动启动属性。属性可以是简单值、属性占位符或SpEL表达式。下面的示例演示了如何执行此操作:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
你还可以使用显式主题和分区(以及可选的初始偏移量)配置POJO侦听器。下面的示例演示了如何执行此操作:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
public void listen(ConsumerRecord<?, ?> record) {
你可以在partitions
或partitionOffsets
属性中指定每个分区,但不能同时指定两者。
使用手动AckMode
时,还可以向侦听器提供Acknowledgment
。下面的示例还演示了如何使用不同的容器工厂:
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
ack.acknowledge();
最后,可以从消息头获得有关消息的元数据。你可以使用以下头名称来检索消息头内容:
KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
2.3.2.2 批处理侦听器
从版本1.1开始,可以配置@KafkaListener
方法来接收从消费者接收的整批消费者记录。要将侦听器容器工厂配置为创建批处理侦听器,可以设置batchListener
属性。下面的示例演示了如何执行此操作:
@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
以下示例显示如何接收有效载荷列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
主题、分区、偏移量等在与有效负载并行的头中可用。下面的示例演示如何使用标题:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
或者,您可以接收消息列表Message<?>
对象,其中包含每个偏移量和每个消息中的其他详细信息,但它必须是唯一的参数(除了使用手动提交时的Acknowledgment和/
或Consumer<?, ?>
参数)。下面的示例演示如何执行此操作:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
在这种情况下,不会对有效载荷执行转换。如果BatchMessagingMessageConverter
配置了RecordMessageConverter
,则还可以向消息参数添加泛型类型,并转换有效负载。有关详细信息,请参阅使用批处理侦听器的负载转换。
你还可以收到一个ConsumerRecord<?, ?>
对象,但它必须是唯一的参数(当使用手动提交或Consumer<?, ?>
参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
从版本2.2开始,侦听器可以接收poll()
方法返回的完整的ConsumerRecords<?, ?>
对象,允许侦听器访问其他方法,例如partitions()
(返回列表中的TopicPartition
实例)和records
(TopicPartition)(获取选择性记录)。同样,这必须是唯一的参数(当使用手动提交或Consumer<?, ?>
参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
2.3.3 @KafkaListener@Payload验证
从2.2版开始,现在更容易添加验证程序来验证@KafkaListener``@Payload
参数。以前,你必须配置一个自定义的DefaultMessageHandlerMethodFactory
并将其添加到注册器中。现在,你可以将验证器添加到注册器本身。以下代码说明了如何执行此操作:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
当你在Spring Boot使用validation starter
,会自动配置LocalValidatorFactoryBean
,如下例所示:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
以下示例演示如何验证:
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
public void setBar(int bar) {
this.bar = bar;
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
2.3.4 重新平衡监听者
ContainerProperties
有一个名为consumerRebalanceListener
的属性,该属性接受Kafka客户端的consumerRebalanceListene
r接口的实现。如果未提供此属性,则容器将配置日志侦听器,该侦听器将在信息级别记录重新平衡事件。该框架还添加了一个子接口ConsumerRawareRebalanceListener
。以下列表显示了ConsumerRawareRebalanceListener
接口定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
2.3.5 转发监听者消息
从2.0版开始,如果还使用@SendTo
注解注释@KafkaListener
,并且方法调用返回结果,则结果将转发到@SendTo
指定的主题。如:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
2.3.6 @KafkaListener生命周期管理
为@KafkaListener
注解创建的侦听器容器不是应用程序上下文中的bean。相反,它们是用KafkaListenerEndpointRegistry
类型的基础设施bean注册的。这个bean由框架自动声明并管理容器的生命周期;它将自动启动任何autoStartup
设置为true
的容器。所有容器工厂创建的所有容器必须处于同一phase
。有关详细信息,请参阅侦听器容器自动启动。你可以使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,可以通过使用单个容器的id属性来获取对该容器的引用。可以在批注上设置autoStartup
,这将覆盖容器工厂中配置的默认设置(setAutoStartup(true)
)。你可以从应用程序上下文中获取对bean的引用,例如自动连接,以管理其注册的容器。以下示例说明了如何执行此操作:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
this.registry.getListenerContainer("myContainer").start();
注册表只维护其管理的容器的生命周期;声明为bean的容器不受注册表管理,可以从应用程序上下文中获取。可以通过调用注册表的getListenerContainers()
方法来获取托管容器的集合。Spring Kafka版本2.2.5添加了一个方便方法getAllListenerContainers()
,它返回所有容器的集合,包括由注册表管理的容器和声明为bean的容器。返回的集合将包括任何已初始化的原型bean,但它不会初始化任何延迟bean声明。
2.4 流处理
Spring for Apache Kafka
提供了一个工厂bean来创建StreamsBuilder
对象并管理其流的生命周期。只要kafka流在classpath上并且kafka流通过@EnableKafkaStreams
注解开启,Spring Boot就会自动配置所需的KafkaStreamsConfiguration
bean。
启用Kafka流意味着必须设置应用程序id和引导服务器(bootstrap servers)。前者可以使用spring.kafka.streams.application-id
配置,如果未设置,则默认为spring.application.name
。后者可以全局设置,也可以专门为流覆写。
使用专用属性可以使用其他几个属性;可以使用spring.Kafka.streams.properties
命名空间设置其他任意Kafka属性。有关详细信息,Additional Kafka Properties 。
默认情况下,由它创建的StreamBuilder
对象管理的流将自动启动。可以使用spring.kafka.streams.auto-startup
属性自定义此行为。
要使用工厂bean,只需将StreamsBuilder
连接到@bean
,如下例所示:
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
默认情况下,由它创建的StreamBuilder
对象管理的流将自动启动。可以使用spring.kafka.streams.auto-startup
属性自定义此行为。
2.5 附加配置
自动配置支持的属性显示在公用应用程序属性中。注意,在大多数情况下,这些属性(连字符或驼峰样式)直接映射到Apache Kafka点式属性。有关详细信息,请参阅Apache Kafka
文档。
前面提到的几个属性应用于所有组件(生产者、消费者、管理员和流),但如果希望使用不同的值,则可以在组件级别指定。Apache Kafka指定重要性为HIGH
、MEDIUM
或LOW
的属性。Spring Boot自动配置支持所有高重要性属性、某些选定的中、低属性以及任何没有默认值的属性。
只有Kafka支持的属性的一个子集可以通过KafkaProperties
类直接使用,如果要使用不直接支持的其他属性配置生产者或消费者,请使用以下属性:
spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth
上面的参数设置示例将公共prop.one
Kafka属性设置为first
(适用于生产者、消费者和管理员),prop.two
admin属性设置为second
,prop.three
consumer属性设置为third
,prop.four
producer属性设置为fourth
,prop.five
streams属性设置为fifth
。
你还可以配置Spring Kafka JsonDeserializer
,如下所示:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
类似地,可以禁用JsonSerializer
在头中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
注意: 以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。
2.6 使用Embdded Kafka做测试
Spring for Apache Kafka提供了一种使用嵌入式Apache Kafka代理测试项目的便捷方法。要使用此功能,请使用Spring Kafka测试模块中的@EmbeddedKafka
注解测试类。有关更多信息,请参阅Spring For Apache Kafka参考手册。
要使Spring Boot自动配置与前面提到的嵌入式Apache Kafka代理一起工作,需要将嵌入式代理地址(由EmbeddedKafkaBroker
填充)的系统属性重新映射到Apache Kafka的Spring Boot配置属性中。有几种方法可以做到这一点:
提供系统属性以将嵌入的代理地址映射到测试类中的spring.kafka.bootstrap-servers
:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
在@EmbeddedKafka
注解上配置属性名:
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
2.7 Spring Integration支持
Spring Integration也有Kafka的适配器,因此我们可以很方便的采用Spring Integration去实现发布订阅,当然你也可以不使用Spring Integration。
Spring Integration是什么,具体有什么作用,可以参考另一篇文章《Spring Integration最详解》。
3 Spring Kafka配置参数
这里对所有配置做个说明的是,spring kafka配置分全局配置和子模块配置,子模块配置会复写全局配置,比如SSL认证可以全局配置,但是也可以在每个子模块,如消费者、生产者、流式处理中都可以单独配置SSL(可能是微服务部署,消费者和生产者不在同一个应用中)。这里重点介绍生产者和消费者配置吧,其他就不展开了,用到的时候再去查找和补充。
3.1 全局配置
# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.bootstrap-servers
# 在发出请求时传递给服务器的ID。用于服务器端日志记录
spring.kafka.client-id,默认无
# 用于配置客户端的其他属性,生产者和消费者共有的属性
spring.kafka.properties.*
# 消息发送的默认主题,默认无
spring.kafka.template.default-topic
3.2 生产者
Spring Boot中,Kafka 生产者
相关配置(所有配置前缀为spring.kafka.producer.
):
# 生产者要求Leader在考虑请求完成之前收到的确认数
spring.kafka.producer.acks
# 默认批量大小。较小的批处理大小将使批处理不太常见,并可能降低吞吐量(批处理大小为零将完全禁用批处理)
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
# 生产者可用于缓冲等待发送到服务器的记录的总内存大小。
spring.kafka.producer.buffer-memory
# 在发出请求时传递给服务器的ID。用于服务器端日志记录。
spring.kafka.producer.client-id
# 生产者生成的所有数据的压缩类型
spring.kafka.producer.compression-type
# 键的序列化程序类
spring.kafka.producer.key-serializer
spring.kafka.producer.properties.*
# 大于零时,启用失败发送的重试次数
spring.kafka.producer.retries
spring.kafka.producer.ssl.key-password
spring.kafka.producer.ssl.key-store-location
spring.kafka.producer.ssl.key-store-password
spring.kafka.producer.ssl.key-store-type
spring.kafka.producer.ssl.protocol
spring.kafka.producer.ssl.trust-store-location
spring.kafka.producer.ssl.trust-store-password
spring.kafka.producer.ssl.trust-store-type
# 非空时,启用对生产者的事务支持
spring.kafka.producer.transaction-id-prefix
spring.kafka.producer.value-serializer
3.3 消费者
Spring Boot中,Kafka 消费者相关配置(所有配置前缀为spring.kafka.consumer.
):
# 如果“enable.auto.commit”设置为true,设置消费者偏移自动提交到Kafka的频率,默认值无,单位毫秒(ms)
spring.kafka.consumer.auto-commit-interval
# 当Kafka中没有初始偏移或服务器上不再存在当前偏移时策略设置,默认值无,latest/earliest/none三个值设置
# earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset
# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.consumer.bootstrap-servers
# 在发出请求时传递给服务器的ID,用于服务器端日志记录
spring.kafka.consumer.client-id
# 消费者的偏移量是否在后台定期提交
spring.kafka.consumer.enable-auto-commit
# 如果没有足够的数据来立即满足“fetch-min-size”的要求,则服务器在取回请求之前阻塞的最大时间量
spring.kafka.consumer.fetch-max-wait
# 服务器应为获取请求返回的最小数据量。
spring.kafka.consumer.fetch-min-size
# 标识此消费者所属的默认消费者组的唯一字符串
spring.kafka.consumer.group-id
# 消费者协调员的预期心跳间隔时间。
spring.kafka.consumer.heartbeat-interval
# 用于读取以事务方式写入的消息的隔离级别。
spring.kafka.consumer.isolation-level
# 密钥的反序列化程序类
spring.kafka.consumer.key-deserializer
# 在对poll()的单个调用中返回的最大记录数。
spring.kafka.consumer.max-poll-records
# 用于配置客户端的其他特定于消费者的属性。
spring.kafka.consumer.properties.*
# 密钥存储文件中私钥的密码。
spring.kafka.consumer.ssl.key-password
# 密钥存储文件的位置。
spring.kafka.consumer.ssl.key-store-location
# 密钥存储文件的存储密码。
spring.kafka.consumer.ssl.key-store-password
# 密钥存储的类型,如JKS
spring.kafka.consumer.ssl.key-store-type
# 要使用的SSL协议,如TLSv1.2, TLSv1.1, TLSv1
spring.kafka.consumer.ssl.protocol
# 信任存储文件的位置。
spring.kafka.consumer.ssl.trust-store-location
# 信任存储文件的存储密码。
spring.kafka.consumer.ssl.trust-store-password
# 信任存储区的类型。
spring.kafka.consumer.ssl.trust-store-type
# 值的反序列化程序类。
spring.kafka.consumer.value-deserializer
3.4 监听器
Spring Boot中,Kafka Listener相关配置(所有配置前缀为spring.kafka.listener.
):
# ackMode为“COUNT”或“COUNT_TIME”时偏移提交之间的记录数
spring.kafka.listener.ack-count=
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
# 如果Broker上不存在至少一个配置的主题(topic),则容器是否无法启动,
# 该设置项结合Broker设置项allow.auto.create.topics=true,如果为false,则会自动创建不存在的topic
spring.kafka.listener.missing-topics-fatal=true
# 非响应消费者的检查间隔时间。如果未指定持续时间后缀,则将使用秒作为单位
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type
3.5 管理
spring.kafka.admin.client-id
# 如果启动时代理不可用,是否快速失败
spring.kafka.admin.fail-fast=false
spring.kafka.admin.properties.*
spring.kafka.admin.ssl.key-password
spring.kafka.admin.ssl.key-store-location
spring.kafka.admin.ssl.key-store-password
spring.kafka.admin.ssl.key-store-type
spring.kafka.admin.ssl.protocol
spring.kafka.admin.ssl.trust-store-location
spring.kafka.admin.ssl.trust-store-password
spring.kafka.admin.ssl.trust-store-type
3.6 授权服务(JAAS)
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=false
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule
spring.kafka.jaas.options.*
3.7 SSL认证
spring.kafka.ssl.key-password
spring.kafka.ssl.key-store-location
spring.kafka.ssl.key-store-password
spring.kafka.ssl.key-store-type
spring.kafka.ssl.protocol
spring.kafka.ssl.trust-store-location
spring.kafka.ssl.trust-store-password
spring.kafka.ssl.trust-store-type
3.8 Stream流处理
spring.kafka.streams.application-id
spring.kafka.streams.auto-startup
spring.kafka.streams.bootstrap-servers
spring.kafka.streams.cache-max-size-buffering
spring.kafka.streams.client-id
spring.kafka.streams.properties.*
spring.kafka.streams.replication-factor
spring.kafka.streams.ssl.key-password
spring.kafka.streams.ssl.key-store-location
spring.kafka.streams.ssl.key-store-password
spring.kafka.streams.ssl.key-store-type
spring.kafka.streams.ssl.protocol
spring.kafka.streams.ssl.trust-store-location
spring.kafka.streams.ssl.trust-store-password
spring.kafka.streams.ssl.trust-store-type
spring.kafka.streams.state-dir
4 Kafka订阅发布基本特性回顾
同一消费组下所有消费者协同消费订阅主题的所有分区
同消费组,多消费者订阅单主题单分区,则分区只会分配给其中一个消费者,除非这个消费者挂掉,才会分配给其他一个消费者消费消息,意思就是其他消费者在旁边看着吃东西
同消费组,N个消费者订阅单主题N个分区,则默认每个消费者都会被分配一个分区
同消费组,N个消费者订阅单主题M个分区,当M > N时,则会有消费者多分配多于一个分区的情况;当M < N时,则会有空闲消费者,类似第一条
所有上面所说的消费者实例可以是线程方式或者是进程方式存在,所说的分区分配机制叫做重平衡(rebalance)
当消费者内成员个数发生变化会触发重平衡;订阅的主题个数发生变化会触发重平衡;订阅的主题分区个数发生变化会触发重平衡;
总之就是一个分区只能分配到一个消费者,一个消费者可以被分配多个分区
消费者offset管理机制
每个主题分区中的消息都有一个唯一偏移值,具有先后顺序,与消费者具有对应关系,消费者每消费一条消息,偏移量加1,并记录在消费者本地,并定期的将记录同步到服务端(Broker),这里的同步机制是可以设置的
消息是被持久化的,当组内所有消费者重新订阅主题时,可以设置是否从头开始消费消息或者是从最后记录的偏移值位置开始消费
分区和消费者个数如何设置
我们知道主题分区是分布在不同的Broker上的,每个分区对应一个消费者,从而具有消息处理具有很高的吞吐量
分区是调优Kafka并行度的最小单元,多线程消费者连接多分区消费消息,在实现上,通过socket连接,因此也会占用文件句柄个数
创建分区都是会占用一定内存的,并不是分区越多越好,当然现在kafka社区在优化这一部分,让分区数达到更大,性能也不会有所影响
具体怎么调优副本、分区、消费者等这里就不展开了,后面专门来研究这个问题。
5 发布订阅示例
实现下面的示例需要的环境:
Kafka + Zookeeper单点服务器或集群已配置好(如果环境搭建不熟悉,可以去翻看前面写的关于Kafka的环境搭建和测试那一篇),或者是使用Spring-kafka-test
embedded Kafka Server
Spring Boot开发环境(2.2.1)
JDK(1.8或以上)
STS(4.4.RELEASE)
MARVEN构建方式
5.1 使用Embedded Kafka Server
我们知道Kafka是Scala+Zookeeper
构建的,可以从官方网站下载部署包并在本地部署。不过,Spring Kafka Test已经封装了Kafka测试的带注解的一键式功能,以打开Kafka服务器,从而简化了验证Kafka相关功能的开发过程,使用起来也非常简单。
添加依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
启动服务,下面使用Junit测试用例直接启动Kafka服务器服务,包括四个代理节点,Run as JUnit Test
。:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
@Test
public void contextLoads()throws IOException {
System.in.read();
@EmbeddedKafka
中可以设置相关参数:
value: 设置创建代理的个数
count: 同value
ports: 代理端口号列表
brokerPropertiesLocation:指定配置文件,如 "classpath:application.properties"
注意:EmbeddedKafka这样默认是没有创建主题的。会提示Topic(s) [test] is/are not present and missingTopicsFatal is true
错误。@EmbeddedKafka默认情况是创建一个代理,该代理具有一个不带任何参数的随机端口,它将在启动日志中输出特定端口和默认配置项。
5.2 简单的发布订阅实现(无自定义配置)
下面实现一个简单发布订阅功能,通过前端WEB调用一个API,然后在该API控制器中得到请求后生产者开始发送消息,消费者后台监听消息,如果收到消费者消息,则打印出来。
5.2.1 添加依赖及配置Kafka
添加Kafka依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置Kafka,这里消费者和生产者在同一应用中,我们只需要配置Kafka Brokers的服务地址+端口:
server:
port: 9000
spring:
kafka:
bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094
listener:
# 设置不监听主题错误,false时,如果broker设置了llow.auto.create.topics = true,生产者发送到未创建主题时,会默认自动创建主题
# 且默认创建的主题是单副本单分区的
missing-topics-fatal: false
consumer:
# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
auto-offset-reset: earliest
5.2.2 添加生产者
@Service
public class Producer {
private static final Logger LOGGER = LogManager.getLogger(Producer.class);
private static final String TOPIC = "users";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
LOGGER.info(String.format("===Producing message: {}", message));
this.kafkaTemplate.send(TOPIC, message);
5.2.3 添加消费者
@Service
public class Consumer {
private static final Logger LOGGER = LogManager.getLogger(Consumer.class);
@KafkaListener(topics = "test", groupId = "group_test")
public void consume(String message) throws IOException {
LOGGER.info(String.format("#### -> Consumed message -> %s", message));
5.2.4 添加WEB控制器
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
@Autowired
KafkaController(Producer producer) {
this.producer = producer;
@GetMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.sendMessage(message);
5.2.5 测试
添加Spring Boot Application:
@SpringBootApplication
public class TestKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(TestKafkaApplication.class, args);
启动Kafka Brokers后,需要手动创建主题(如果想自动创建,则需要借助KafkaAdmin,或者是Kafka Broker设置了allow.auto.create.topics=true
且应用设置了listener.missing-topics-fatal=false
):
# 如果对kafka-topics.sh这里不熟悉,可以去翻看前面写的关于Kafka的相关文章(环境搭建和测试那一篇)
# 创建test主题
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test
打开浏览器测试:
http://localhost:9000/kafka/publish?message=hello
则应用控制台会打印hello
。整个发布订阅的实现只使用了跟Kafka相关的@KafkaListener
注解接收消息和KafkaTemplate
模板发送消息,很是简单。
5.3 基于自定义配置发布订阅实现
上面是简单的通过Spring Boot依赖的Spring Kafka配置即可快速实现发布订阅功能,这个时候我们是无法在程序中操作这些配置的,因此这一小节就是利用我们之前《Spring Boot从零入门7_最新配置文件配置及优先级详细介绍》文章中讲述的自定义配置文件方式去实现发布订阅功能。
实现内容有:
自定义Kafka配置参数文件(非application.properties/yml)
可实现多生产者(每个生产者为单服务单线程),多消费者(非@KafkaListener
实现消息监听)
支持SSL安全配置
监听生产者
源码不会直接贴,只给出主体部分。
配置文件:
@Configuration
@ConfigurationProperties(prefix = "m2kc")
@PropertySource("classpath:kafka.properties")
@Validated
public class M2KCKafkaConfig {
@Value("${m2kc.kafka.bootstrap.servers}")
private String kafkaBootStrapServers;
@Value("${m2kc.kafka.key.serializer.class}")
private String kafkaKeySerializerClass;
......
......
@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaProducer {
private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class);
private String mTopic = "test";
private M2KCKafkaConfig mM2KCKafkaConfig;
private KafkaTemplate<String, String> mKafkaTemplate;
@Autowired
public KafkaProducer(M2KCKafkaConfig kafkaConfig) {
mTopic = kafkaConfig.getKafkaSourceTopic();
mM2KCKafkaConfig = kafkaConfig;
mKafkaTemplate = getKafkaTemplate();
public KafkaTemplate<String, String> getKafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
return kafkaTemplate;
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass());
if (mM2KCKafkaConfig.isKafkaSslEnable()) {
// TODO : to test
properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());
properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
return new DefaultKafkaProducerFactory<String, String>(properties);
public void sendMessage(String msg) {
LOGGER.info("===Producing message[{}]: {}", mTopic, msg);
ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.info("===Producing message success");
@Override
public void onFailure(Throwable ex) {
LOGGER.info("===Producing message failed");
@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaConsumer implements InitializingBean {
private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class);
private String mTopic;
private M2KCKafkaConfig mM2KCKafkaConfig;
private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer;
@Autowired
public KafkaConsumer(M2KCKafkaConfig kafkaConfig) {
LOGGER.info("===KafkaConsumer construct");
mTopic = kafkaConfig.getKafkaSourceTopic();
mM2KCKafkaConfig = kafkaConfig;
@PostConstruct
public void start(){
LOGGER.info("===KafkaConsumer start");
@Override
public void afterPropertiesSet() throws Exception {
LOGGER.info("===afterPropertiesSet is called");
createContainer();
private void createContainer() {
mKafkaMessageListenerContainer = createKafkaMessageListenerContainer();
mKafkaMessageListenerContainer.setAutoStartup(false);;
mKafkaMessageListenerContainer.start();
LOGGER.info("===", mKafkaMessageListenerContainer);
private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() {
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(),
createContainerProperties());
LOGGER.info("===createKafkaMessageListenerContainer");
return container;
private ContainerProperties createContainerProperties() {
ContainerProperties containerProps = new ContainerProperties(mTopic);
containerProps.setMessageListener(createMessageListener());
return containerProps;
private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID());
if (mM2KCKafkaConfig.isKafkaSslEnable()) {
// TODO : to test
properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());
properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
return new DefaultKafkaConsumerFactory<String, String>(properties);
private MessageListener<String, String> createMessageListener() {
return new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// TODO Auto-generated method stub
LOGGER.info("===Consuming msg: {}", data.value());
继承InitializingBean
只是为了初始化,也可以去掉,将初始化写入了构造函数中。这里的消费者和生产者都使用@Scope
,所以需要手动获取实例,通过context去调用getBean()。另外配置文件没有写全,这里需要注意。
5.3 基于Spring Integration发布订阅实现
Spring Integration也有对Kafka支持的适配器,采用Spring Integration,我们也能够快速的实现发布订阅功能,且实现群组多消费者批量消费功能:
实现Kafka自定义配置类
采用Spring Integration
群组多消费者批量消费
采用DSL特定领域语法去编写
生产者发布成功与失败异常处理
我们可以先看看整体的Kafka消息传递通道:
出站通道中KafkaProducerMessageHandler用于将消息发送到主题
KafkaMessageDrivenChannelAdapter用于设置入站通道和消息处理
具体的Demo可以参考Github中的一个sample :
https://github.com/spring-projects/spring-integration-samples
本篇文章详细介绍了Spring Kafka的发送消息和接收消息功能,其他包括Spring Kafka Stream的简单介绍、Spring Kafka参数配置,以及在Spring Boot中如何通过三种方式去实现Kafka的发布订阅功能,涉及了Kafka的多消费者多订阅者,SSL安全传输,Spring Integration Kafka等。文章很长,把握总体,结合实际,差不多基本内容都有所涉及了。
7 知识扩展
Spring Expression Language(简称SpEL),在Spring中,不同于属性占位符${...}
,而SpEL
表达式则要放到#{...}
中(除代码块中用Expression外)。如配置文件中有topics参数spring.kafka.topics
,则可以将配置文件中参数传入注解@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
。
SpEL
表达式常用示例:
// 字面量
#{3.1415926} // 浮点数
#{9.87E4} // 科学计数法表示98700
#{'Hello'} // String 类型
#{false} // Boolean 类型
// 引用Bean、属性和方法
#{sgtPeppers} // 使用这个bean
#{sgtPeppers.artist} // 引用bean中的属性
#{sgtPeppers.selectArtist()} // 引用bean中的方法
#{sgtPeppers.selectArtist().toUpperCase()} // 方法返回值的操作
#{sgtPeppers.selectArtist()?.toUpperCase()} // 防止selectArtist()方法返回null,?表示非null则执行toUpperCase()
// 访问类作用域的方法和常量的话,使用T()这个关键的运算符
#{T(java.lang.Math)}
#{T(java.lang.Math).PI} // 引用PI的值
#{T(java.lang.Math).random()} // 获取0-1的随机数
#{T(System).currentTimeMillis()} // 获取时间到当前的毫秒数
// 替代属性占位符获取配置文件属性值
@Value("#{表达式}"
private String variable;
8 参考资料
https://docs.spring.io/spring-kafka/docs/2.3.4.RELEASE/reference/html/
https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html
https://blog.csdn.net/lishuangzhe7047/article/details/74530417
https://docs.spring.io/spring-boot/docs/2.2.0.RELEASE/reference/htmlsingle/#boot-features-kafka
https://docs.spring.io/spring-boot/docs/2.2.0.RELEASE/reference/htmlsingle/#common-application-properties
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/EnableKafka.html
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html
https://www.javatt.com/p/16904
https://github.com/cwenao/springboot_cwenao
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/package-summary.html
https://docs.spring.io/spring-kafka/reference/html/#spring-integration
https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html
https://www.intertech.com/Blog/spring-integration-enterprise-integration-veg-o-matic/
https://joshlong.com/jl/blogPost/spring_integration_adapters_gateways_and_channels.html
https://examples.javacodegeeks.com/enterprise-java/spring/integration/spring-integration-kafka-tutorial/
https://www.orchome.com/553
https://docs.spring.io/spring-integration/docs/5.1.0.RELEASE/reference/html/java-dsl.html
https://programming.vip/docs/spring-boot-integration-kafka-spring-kafka-in-depth-exploration.html (事务型消息)
https://docs.confluent.io/current/kafka/authentication_ssl.html
https://github.com/spring-projects/spring-kafka/issues/361
https://www.jianshu.com/p/27fd3754bb9c
https://www.jianshu.com/p/cec449a7e73a
https://memorynotfound.com/spring-kafka-batch-listener-example/