@KafkaListener(topics = "#{'${kafkaTopicName}'.split(',')}")
public void listeninstances(ConsumerRecord<?, ?> record){
logger.info("----------------- record =" + record);
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message =kafkaMessage.get();
String topic = record.topic();
logger.info("====topic=="+topic+"========message =" + message);
rmbKafkaClientService.loadData2Es(topic,message.toString());
logger.info("----------send-to-es-success--------");
只需要添加这个注解就可以
@KafkaListener(topics = "#{'${kafkaTopicName}'.split(',')}")
yml中文件为
kafkaTopicName: instances,modelida
用逗号分割
接到领导的一个需求,希望封装一下kafka的消费者,可以从配置读取topic进行消费;一开始首先想到的是用java kafka的高阶api手工根据topic创建消费者,一个topic创建一个消费者,依赖zookeeper完成kafka内部的balance和其他管理。后来领导又提出不要依赖zookeeper,之前老是rebalance失败。
调研了一下,手工实现类似sp...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency..
topics: "admin,login,client"
@KafkaListener(topics = "#{'${topics}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}")
2:在配置类中获取
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependenc
文章目录spring kafka 动态创建 topic 监听问题分析需要解决的问题解决方案Consumer topickafak 通用配置 设置timer 触发实现ConsumerMessageHelper 业务处理运行效果输出
spring kafka 动态创建 topic 监听
spring boot 已经对kafka 进行了很好的封装集成,只需要早配置文件中配置相应的配置参数,再配合
@KafkaListener 注解即可监听kafka 消息,但如果想动态监听某一类消息而不是固定的某几个
背景: 使用springboot整合kafka时, springboot默认读取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener时指定topic即可, 当服务中需要监听多个kafka时, 需要配置多个kafka, 这种方式不适用。4. @KafkaListener使用。
在Spring Boot应用程序中,您可以使用Spring Kafka库与Apache Kafka进行集成。注解用于指定一个固定的主题,但是有时候您可能需要动态地指定多个主题。在本文中,我将展示如何在Spring Boot应用程序中动态指定多个Kafka主题。这是一个简单的示例,展示了如何在Spring Boot应用程序中动态指定多个Kafka主题。现在,我们需要在应用程序的配置文件中指定要订阅的主题。在上面的示例中,我们使用逗号分隔的方式指定了两个主题。在上面的代码中,我们定义了两个使用。