添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
害羞的毛豆  ·  How to add a .lib ...·  1 周前    · 
礼貌的小马驹  ·  select·  3 周前    · 
刚分手的毛衣  ·  No QueryClient set, ...·  3 月前    · 

Spring Kafka—— KafkaListenerEndpointRegistry 隐式注册分析

由于我想在项目中实现基于 Spring kafka 动态连接 Kafka 服务,指定监听 Topic 并控制消费程序的启动和停止这样一个功能,所以就大概的了解了一下 Spring Kafka 的几个重要的类的概念,内容如下:

  1. ConsumerFactory
    • 作用:负责创建 Kafka 消费者的实例。 ConsumerFactory 是一个简单的工厂类,用于封装消费者的配置(如bootstrap servers, key deserializer, value deserializer等)并生成 Consumer 实例。
    • 用法:通常在Spring配置类中定义,并通过依赖注入提供给 KafkaListenerContainerFactory
  2. ConcurrentKafkaListenerContainerFactory
    • 作用:这个工厂类用于创建 ConcurrentMessageListenerContainer 实例,该容器管理多个 Kafka MessageListenerContainer 来提供并发消息消费。
    • 特点:可以设置并发消费的数量,即同时运行的 MessageListenerContainer 的数量。
      支持消息过滤、错误处理和事务管理。
    • 用法:在Spring配置类中定义,并设置其 ConsumerFactory 和其他相关配置。然后,可以通过 @KafkaListener 注解直接使用,Spring会自动使用这个工厂来创建监听器。
  3. KafkaListenerEndpointRegistry
    • 作用:这是一个管理类,用于管理应用中所有由 @KafkaListener 注解创建的消息监听器容器。
    • 特点:提供了启动和停止监听器的方法,可以在运行时控制监听器。
      可以用来查询当前所有注册的监听器的状态。
    • 用法:通常自动配置,可以通过自动注入到任何Spring管理的Bean中,用于运行时管理监听器。
  4. KafkaTemplate
    • 作用:这是一个高级抽象,用于生产消息到Kafka主题。
    • 特点:提供同步和异步发送消息的方法。
      支持事务消息发送。
    • 用法:定义在Spring配置类中,注入生产者工厂 ProducerFactory ,并用于应用中的消息发送。
  5. @KafkaListener
    作用:注解用于标记方法以作为Kafka消息的监听器,这些方法会自动被Spring容器管理,并在有新消息时触发。
    特点:
    可以指定主题、分区和消费组。
    支持并发消费。
    用法:放在组件的方法上,方法参数可以灵活地映射消息的key、value、headers等。

从上面的内容可以看到, KafkaListenerEndpointRegistry 这个类是管理消息监听容器的,并提供了启动和停止监听器的方法,于是我就想创建这个类来完成我的需求功能。当我直接写如下内容时:

@Component
public class KafkaConfig {@Autowiredprivate KafkaListenerEndpointRegistry registry;@PostConstructpublic void init() {System.out.println(registry);}

IDEA提示了 Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found. 但是我启动 SpringBoot 项目却没有报错 :
在这里插入图片描述
我在我的项目中是没有加 @EnableKafka 这样的注解的,代码如下:

@SpringBootApplication
public class SpringKafkaExampleApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaExampleApplication.class, args);}

引入的依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies>

于是我就比较好奇,项目启动的时候是在什么地方声明了 KafkaListenerEndpointRegistry 这个 bean 的。

KafkaListenerEndpointRegistry 隐式注册分析

SpringBoot 对于 kafka 有如下的自动配置:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {private final KafkaProperties properties;private final RecordMessageConverter recordMessageConverter;private final RecordFilterStrategy<Object, Object> recordFilterStrategy;private final BatchMessageConverter batchMessageConverter;private final KafkaTemplate<Object, Object> kafkaTemplate;private final KafkaAwareTransactionManager<Object, Object> transactionManager;private final ConsumerAwareRebalanceListener rebalanceListener;private final CommonErrorHandler commonErrorHandler;private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;private final RecordInterceptor<Object, Object> recordInterceptor;KafkaAnnotationDrivenConfiguration(KafkaProperties properties,ObjectProvider<RecordMessageConverter> recordMessageConverter,ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,ObjectProvider<BatchMessageConverter> batchMessageConverter,ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,ObjectProvider<CommonErrorHandler> commonErrorHandler,ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {this.properties = properties;this.recordMessageConverter = recordMessageConverter.getIfUnique();this.recordFilterStrategy = recordFilterStrategy.getIfUnique();this.batchMessageConverter = batchMessageConverter.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));this.kafkaTemplate = kafkaTemplate.getIfUnique();this.transactionManager = kafkaTransactionManager.getIfUnique();this.rebalanceListener = rebalanceListener.getIfUnique();this.commonErrorHandler = commonErrorHandler.getIfUnique();this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();this.recordInterceptor = recordInterceptor.getIfUnique();}@Bean@ConditionalOnMissingBeanConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();configurer.setKafkaProperties(this.properties);configurer.setBatchMessageConverter(this.batchMessageConverter);configurer.setRecordMessageConverter(this.recordMessageConverter);configurer.setRecordFilterStrategy(this.recordFilterStrategy);configurer.setReplyTemplate(this.kafkaTemplate);configurer.setTransactionManager(this.transactionManager);configurer.setRebalanceListener(this.rebalanceListener);configurer.setCommonErrorHandler(this.commonErrorHandler);configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);configurer.setRecordInterceptor(this.recordInterceptor);return configurer;}@Bean@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));return factory;}@Configuration(proxyBeanMethods = false)@EnableKafka@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)static class EnableKafkaConfiguration {}

可以看到这个配置类里面有一个静态的内部类 EnableKafkaConfiguration 该类上声明了 @EnableKafka 注解,也就是说内部静态类EnableKafkaConfiguration使用了@EnableKafka注解,并且通过@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)确保如果Spring上下文中缺少相应的Bean,则自动激活@EnableKafka功能。这意味着,即便你没有在你的应用配置中显式添加@EnableKafka,这个内部类也可以根据条件自动注册所需的Bean,从而启用Kafka的支持。

@EnableKafka 定义如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {

这个注解的使用导致了KafkaListenerConfigurationSelector的激活,其源码如下:

@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {@Overridepublic String[] selectImports(AnnotationMetadata importingClassMetadata) {return new String[] { KafkaBootstrapConfiguration.class.getName() };}}

上面的代码中 DeferredImportSelector是Spring框架中一个特殊的接口,它继承自ImportSelector。它主要用于处理配置类的导入,允许更细致地控制配置类的加载顺序。这个接口特别适用于那些依赖于由Spring容器中其他Bean或配置动态决定的配置。
KafkaListenerConfigurationSelector 这个类实现了DeferredImportSelector并通过selectImports方法返回了一个配置类名称的数组。这个方法指定了当Spring处理到这个选择器时,它应该导入KafkaBootstrapConfiguration类。

KafkaBootstrapConfiguration 内容如下:

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));}if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,new RootBeanDefinition(KafkaListenerEndpointRegistry.class));}}}

KafkaBootstrapConfiguration 是一个实现了ImportBeanDefinitionRegistrar接口的类,主要用于程序化地注册Bean定义到Spring的ApplicationContext中。通过实现ImportBeanDefinitionRegistrar接口,这个类可以在Spring的配置阶段动态地添加Bean定义。

在这个特定的实现中,KafkaBootstrapConfiguration检查特定的Kafka相关Bean(如KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME和KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)是否已经注册。如果这些Bean尚未注册,它会使用RootBeanDefinition手动注册这些Bean到Spring容器中。

RootBeanDefinition 的功能

RootBeanDefinition是Spring框架中用于定义Bean的一个核心类。它是BeanDefinition接口的一个直接实现,提供了一种配置Spring管理的Bean的方式,包括Bean的类类型、生命周期回调、依赖信息等。

  • Bean配置的详细定义:RootBeanDefinition允许开发者详细定义Bean的创建细节,如构造函数参数、属性值、初始化方法、销毁方法等。
  • 高级功能:它还支持更复杂的配置,如懒加载、自动装配模式、作用域和其他高级特性。
  • 程序化Bean注册:通过使用RootBeanDefinition,开发者可以在运行时动态地注册Bean,这对于条件配置或需要响应不同配置环境的高级用途尤为重要。

KafkaBootstrapConfiguration类中,使用RootBeanDefinition来创建和注册KafkaListenerAnnotationBeanPostProcessorKafkaListenerEndpointRegistry类的实例,这些是设置和管理Kafka消息监听器所必需的。

之后在AbstractBeanFactory会根据 beanName 获取到了 RootBeanDefinition 如下图所示:
在这里插入图片描述
然后在如下所示的位置:
在这里插入图片描述
程序创建了 beanName 为 org.springframework.kafka.config.internalKafkaListenerEndpointRegistry 的实例,具体创建实例的位置如下:
在这里插入图片描述
从调试中可以看到此处实例化了 KafkaListenerEndpointRegistry
所以当我们 springboot 项目引入了

 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

依赖后,即使我们不显示的声明 @EnableKafka 程序也会进行初始化相应的配置。

当Spring Boot项目中引入Spring Kafka依赖后,即使我们没有显式声明@EnableKafka,系统仍会自动进行相应的配置。因此,在项目中尝试注入KafkaListenerEndpointRegistry时,尽管IDE可能会提示“Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found.”,项目依然能够正常启动。这是因为KafkaListenerEndpointRegistry在Spring Kafka的自动配置过程中已被隐式注册。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: http://www.xdnf.cn/news/5071.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

ssm071北京集联软件科技有限公司信息管理系统+jsp

北京集联软件科技有限公司信息管理系统 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本信息管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理…

四六级英语听力考试音频无线发射系统在安顺学院的成功应用分析

四六级英语听力考试音频无线发射系统在安顺学院的成功应用分析 由北京海特伟业科技任洪卓发布于2024年4月22日 安顺学院为了提高学生的外语听力水平&#xff0c;并确保英语四六级听力考试的稳定可靠进行&#xff0c;决定对传统的英语听力音频传输系统进行改造&#xff0c;以提供…

小游戏:贪吃蛇

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;贪吃蛇 &#x1f337;追光的人&#xff0c;终会万丈光芒 目录 &#x1f3dd;1.头文件&#xff1a; &#x1f3dd;2.实现文件&#xff1a; &#x1f3dd;3.测试文件 &#xff1a; 前言&#…

处理 SoC 中的性能瓶颈

SoC 中不断添加处理核心&#xff0c;但它们不会都得到充分利用&#xff0c;因为真正的瓶颈没有得到解决。 SoC 需要处理的数据量激增&#xff0c;虽然处理核心本身可以处理这些数据&#xff0c;但内存和通信带宽成为瓶颈。现在的问题是可以采取什么措施解决这个问题。 内存和 C…

深入了解直播美颜工具与视频美颜SDK的实现与优化策略

今天&#xff0c;小编将为大家详解视频美颜SDK技术的视线方案与优化策略。 一、美颜工具的实现原理 利用特征提取算法提取人脸的各种特征&#xff0c;如皮肤色调、眼睛大小等。接下来&#xff0c;根据用户设定的美颜参数&#xff0c;对提取的特征进行修改。最后&#xff0c;将…

【测试】Kali Linux 渗透安全学习笔记(4) - 单一服务器扫描

距离上次做 Kali Linux 分享已经相隔半年之久了&#xff0c;刚好需要主导公司每半年一次的本地安全加固工作&#xff0c;这次将简单分享自己是如何做单一服务器的扫描。 声明&#xff1a; 本文测试的站点为自家站点仅做学习使用&#xff0c;不存在侵犯网络信息安全问题&#…

IDEA 2024.1 配置 AspectJ环境

最近Java课设在学习AspectJ&#xff0c;做PPT顺便写一个博客 首先去AspectJ官网下载一个JAR包并安装 安装完最后可以按照他的建议配置一下 然后找到AspectJ的安装位置的lib目录&#xff0c;把三个包拷到自己项目中的lib目录下 由于最新版的IDEA已经不支持AspectJ了 所…

R可视化:桑基图展示数据层流动

介绍 以桑基图形式展示数据分布情况 knitr::opts_chunk$set(message = FALSE, warning = FALSE) library(tidyverse) library(ggalluvial)# rm(list = ls()) options(stringsAsFactors = F) options(future.globals.maxSize = 10000 * 1024^2) metadata…

【STM32+HAL+Proteus】系列学习教程3---GPIO输出模式(LED流水灯、LED跑马灯)

实现目标 1、掌握GPIO 输出模式控制 2、学会STM32CubeMX软件配置GPIO 3、具体目标&#xff1a;1、开发板4个LED实现流水灯&#xff1b;2、开发板4个LED实现跑马灯灯。 一、STM32 GPIO 概述 1、GPIO定义 GPIO&#xff08;General-purpose input/output&#xff09;是通用输入…

WPF2 样式布局

样式布局 WPF中的各类控件元素, 都可以自由的设置其样式。 诸如: 字体(FontFamily) 字体大小(FontSize) 背景颜色(Background) 字体颜色(Foreground) 边距(Margin) 水平位置(HorizontalAlignment) 垂直位置(VerticalAlignment) 等等。 而样式则是组织和重用以上的重要工具。…

标题Selenium IDE 常见错误笔记

Selenium IDE 常见错误笔记 错误1&#xff1a;Failed:Exceeded waiting time for new window to appear 2000ms 这个错误通常出现在第一次运行时&#xff0c;有两个原因&#xff1a; Firefox阻止了弹出式窗口&#xff0c;在浏览器设置里允许这个操作即可。 有些网站设置了反…

循环神经网络实例——序列预测

我们生活的世界充满了形形色色的序列数据&#xff0c;只要是有顺序的数据统统都可以看作是序列数据&#xff0c;比如文字是字符的序列&#xff0c;音乐是音符组成的序列&#xff0c;股价数据也是序列&#xff0c;连DNA序列也属于序列数据。循环神经网络RNN天生就具有处理序列数…

物联网配网工具多元化助力腾飞——智能连接,畅享未来

随着物联网技术的迅猛发展&#xff0c;智能插座、蓝牙网关作为其中常见的智能物联设备&#xff0c;无论是功能还是外观都有很大的改进&#xff0c;在智能化越来越普遍的情况下&#xff0c;它们的应用场景也在不断拓宽。对于智能设备而言&#xff0c;配网方式的选择对于设备的成…

指针专题(4)【qsort函数的概念和使用】

1.前言 上节我们学习了指针的相关内容&#xff0c;本节我们在有指针的基础的条件下学习一下指针的运用&#xff0c;那么废话不多说&#xff0c;我们正式进入今天的学习 2.回调函数 我们既然已经学习了指针的相关基础&#xff0c;那么我们此时就可以用指针来实现回调函数 而回…

无人驾驶 自动驾驶汽车 环境感知 精准定位 决策与规划 控制与执行 高精地图与车联网V2X 深度神经网络学习 深度强化学习 Apollo

无人驾驶 百度apollo课程 1-5 百度apollo课程 6-8 七月在线 无人驾驶系列知识入门到提高 当今,自动驾驶技术已经成为整个汽车产业的最新发展方向。应用自动驾驶技术可以全面提升汽车驾驶的安全性、舒适性,满足更高层次的市场需求等。自动驾驶技术得益于人工智能技术的应用…

面试十五 容器

一、vector容器 template<typename T> class Allocator{ public:T* allocator(size_t size){// 负责内存开辟return (T*)malloc(sizeof(T) * size);}void deallocate(void * p){free(p);}void construct(T*p,const T&val){// 定位newnew (p) T(val);}void destroy(…

区块链技术与跨境支付:探讨Web3的国际支付系统

引言&#xff1a;跨境支付的复杂性与机遇 在全球化的背景下&#xff0c;跨境支付已经成为经济活动中不可或缺的一部分。然而&#xff0c;当前的跨境支付系统由于其复杂性和不透明性&#xff0c;往往面临着高额费用、延迟和潜在的安全隐患。区块链技术的出现为这一领域带来了前…

kafka 命令行使用 消息的写入和读取 quickstart

文章目录 Intro命令日志zookeeper serverkafka servercreate topic && describe topic Intro Kafka在大型系统中可用作消息通道&#xff0c;一般是用程序语言作为客户端去调用kafka服务。 不过在这之前&#xff0c;可以先用下载kafka之后就包含的脚本文件等&#xff0…

HarmonyOS开发 - hilog日志系统

文章目录 一、介绍二、对日志的封装 一、介绍 harmony OS提供了系统日志打印 hilog开发者可以通过使用这些接口实现日志相关功能&#xff0c;输出日志时可以指定日志类型、所属业务领域、日志TAG标识、日志级别等。系统文档HiLog日志打印