添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

那些配置关联了 kafka 服务器暴露给客户端的地址信息

和如下问题关联, 暴露给客户端的 地址信息 只要是来自于 advertisedListeners 相关

当前机器的虚拟机[NAT网络]映射出 9200 到宿主机, 局域网的其他机器访问 宿主机+9200 访问不到 kafka 的服务

引用自上面的链接

FindCoordinatorRequest 获取的数据来自于 metadataSnapshot
kafkaController.sendUpdateMetadataRequest 发生事件的时候 更新 metadataSnapshot 的请求
aliveNodes 来自于 broker 的相关信息, 这个 brokerInfo 来自于 KafkaController.initializeControllerContext
KafkaController.initializeControllerContext 的 aliveNodes 来自于 zkClient 向 "/brokers/ids/*" 获取 brokerId 列表, 然后在依次获取 "brokers/ids/$id" 的数据信息, 作为 Broker 的元数据信息
broker 注册到 zk 的信息来自于 KafkaServer 启动的时候向 zk 注册的, endpoints 的相关信息来自于 "advertised.listeners", "advertised.host.name:advertised.host.port", "listeners", "host.name:port"

具体的获取 advertisedListeners 相关的处理如下

kafka 启动之后还会连接 broker ?

可以发现一个问题, 就是 假设 我 advertised.listeners 配置了一个存在问题的配置, ip 不存在, 或者 服务不存在

服务器启动 大致会报错如下

java.net.SocketTimeoutException: Failed to connect within 30000 ms
	at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:297)
	at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:250)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

这个错误是什么呢?, 影响到的又有哪些呢?

这个请求的处理, 主要是来自于 controller

kafkaServer - kafkaController - controllerChannelManager - requestSendThread, 再到具体的 requestSendThread 就是最原始导致上面报错的地方了,

在各种事件触发的时候, controller 需要连接 broker, 发送请求[由 requestSendThread 来发送请求], 比如 RegisterBrokerAndReelect 的时候, 需要 选举 controller, 之后需要发送 UpdateMetadataRequest

然后 客户端尝试连接 kafka 服务, 会得到报错大致如下

[10:44:05.239] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 29 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.365] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 30 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.484] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 31 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.604] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 32 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.721] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 33 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.840] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 34 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.953] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 35 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:06.065] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 36 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:06.187] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 37 : {test20220528=LEADER_NOT_AVAILABLE}

kakfa 事件相关处理

kafkaServer - kafkaController - eventManager - controllerEventThread

controllerEventThread 中的事件处理, 主要又是委托给了 processor, 也就是 kafkaController

这个 controller 的核心业务就是这各种事件的处理

事件主要是来自于 kafka 本身, 以及监听 zk 的相关状态变化 产生

controller 是如何选举的?

基于 zk 来选举 controller

如果 创建节点 "/controller"  成功, 成功的 broker 即为 controller, 其他的节点不是

broker 来尝试在 zk 上面创建 "/controller" 节点

broker 的编号,如果集群 有多个broker ,则每个broker 的编号需要设置的不同 kafka 监听端口,一般默认是9092, 使用 1024以前端口的话就需要root权限。E.g:开启线程消费时用到的就是这个端口 zookeeper.connect 用于保存broker元数据的zo...
一、 Kafka 概述 PUBLISH & SUBSCRIBE Read and write streams of data like a messaging system. 发布和订阅 读取和写入数据流,类似消息传递系统。 PROCESS Write scalable stream processing applications that react to events in real-t...
Kafka -API及 配置 项说明一.前言    在这里对生产者和消费者的API做一些说明,在开发 按实际需求选择 使用 二.Producer    2.1 producer的实例化        在新版本 Kafka ,不再 使用 之前需要 config 类来读取 配置 信息 Producer Config config = new Producer Config (properties);
2020-08-19 22:55:56.089 WARN [-,,,] 35672 --- [ad | producer-1] org.apache. kafka .clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 483 : {abc_001=LEADER_NOT_AVAILABLE} 2020-08-19 22:55:56.205 WARN.
位置:org.apache. kafka .clients; 作用:用于异步请求/响应网络i/o的网络客户端。这是一个内部类,用于实现面向用户的生产者和消费者客户端,也就是说我们的生产请求和消费请求最后都会交给NetworkClient类处理。 在sender线程 run时调用去。用于发送请求 通过连接处理实际的读写操作............
102 在 CommandLineRunner 中执行 IBaseService.list 报错 “Parameter ‘paramType_Gr8re1Ee‘ not found“ 教练、我想打篮球: 纠正问题中两个关键的 Interceptor 的顺序 102 在 CommandLineRunner 中执行 IBaseService.list 报错 “Parameter ‘paramType_Gr8re1Ee‘ not found“ hsqlrgy: 应该咋解决呢,我在项目启动也碰到了这个问题