第 6 章 管理 Kafka
使用额外的配置属性来维护 AMQ Streams 的部署。您可以添加和调整设置,以响应 AMQ Streams 的性能。例如,您可以引入其他配置以提高吞吐量和数据可靠性。
6.1. 调整 Kafka 配置
使用配置属性来优化 Kafka 代理、生产者和消费者的性能。 需要一组最小配置属性,但您可以添加或调整属性,以更改制作者和消费者与 Kafka 代理交互的方式。例如,您可以调整消息的延迟和吞吐量,以便客户端可以实时响应数据。 您可能首先分析指标以量化进行初始配置的位置,然后进行增量更改,并进一步比较指标,直到您有所需的配置。 有关 Apache Kafka 配置属性的更多信息,请参阅 Apache Kafka 文档 。
6.1.1. Kafka 代理配置调整
使用配置属性来优化 Kafka 代理的性能。您可以使用标准 Kafka 代理配置选项,但由 AMQ Streams 直接管理的属性除外。
6.1.1.1. 基本代理配置
基本配置将包含以下属性来识别您的代理并提供安全访问:
broker.id
是 Kafka 代理的 ID
log.dirs
是日志数据的目录
zookeeper.connect
是 Kafka 与 ZooKeeper 连接的配置
侦听器
将 Kafka 集群公开给客户端
授权机制
允许或拒绝用户执行的操作
身份验证机制
证明了需要访问 Kafka 的用户的身份
有关配置
Kafka
中基本配置选项的详情,。
典型的代理配置还包括与主题、线程和日志相关的属性的设置。
基本代理配置属性
# ... num.partitions=1 default.replication.factor=3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 num.network.threads=3 num.io.threads=8 num.recovery.threads.per.data.dir=1 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 group.initial.rebalance.delay.ms=0 zookeeper.connection.timeout.ms=6000 # ...
6.1.1.2. 复制主题以实现高可用性
基本主题属性为主题设置默认分区数和复制因素,这些主题将应用于没有显式设置这些属性创建的主题,包括何时自动创建主题。
# ... num.partitions=1 auto.create.topics.enable=false default.replication.factor=3 min.insync.replicas=2 replica.fetch.max.bytes=1048576 # ...
auto.create.topics.enable
属性默认为启用,以便在生产者和消费者需要时自动创建不存在的主题。如果使用自动主题创建,您可以使用
num.partitions
为主题设置默认分区数量。通常,此属性被禁用,以便在创建主题时提供更多的控制
对于高可用性环境,建议将复制因素增加到至少 3 个用于主题,并将最小 in-sync 副本数量设置为复制因素小 1。
对于
数据持久性
,您还应在主题配置中设置
min.insync.replicas
,并在生成者配置中使用
acks=all
进行消息交付确认。
使用
replica.fetch.max.bytes
设置复制领导分区的每个后续程序获取的最大大小(以字节为单位)。根据平均消息大小和吞吐量更改这个值。在考虑读/写缓冲区所需的总内存分配时,可用内存也必须能适应所有后续者的最大复制消息大小。大小还必须大于
message.max.bytes
,以便复制所有消息。
delete.topic.enable
属性默认为启用,以允许删除主题。在生产环境中,您应该禁用此属性以避免意外删除主题,从而导致数据丢失。但是,您可以临时启用它并删除主题,然后再次禁用它。
# ... auto.create.topics.enable=false delete.topic.enable=true # ...
6.1.1.3. 事务和提交的内部主题设置
如果您使用事务
启用对生成者中的分区的原子写入,则事务的状态存储在内部
__transaction_state
主题。默认情况下,代理配置有 3 个复制因素,以及此主题的最小同步副本,这意味着 Kafka 集群中最少需要三个代理。
# ... transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 # ...
同样,存储消费者状态的内部
__consumer_offsets
主题具有分区和复制因素的数量的默认设置。
# ... offsets.topic.num.partitions=50 offsets.topic.replication.factor=3 # ...
不要在生产环境中减少这些设置。 在 生产 环境中,您可以提高设置。作为例外,您可能需要减少单代理 测试环境中的设置 。
6.1.1.4. 通过增加 I/O 线程来提高请求处理吞吐量
网络线程处理对 Kafka 集群的请求,如从客户端应用程序生成和获取请求。生成请求将置于请求队列中。响应放置在响应队列中。 网络线程数量应该反映了复制因者和与 Kafka 集群交互的用户的活动级别。如果您要拥有大量请求,您可以使用闲置时间线程数量来增加线程数量,以确定何时添加更多线程。 要减少拥塞并规范请求流量,您可以在网络线程被阻止前限制请求队列中允许的请求数。 I/O 线程从请求队列获取请求来处理它们。添加更多线程可以提高吞吐量,但 CPU 内核和磁盘带宽的数量会实施实际的上限。至少,I/O 线程的数量应等于存储卷的数量。
# ... num.network.threads=3 1 queued.max.requests=500 2 num.io.threads=8 3 num.recovery.threads.per.data.dir=1 4 # ...
- 1
-
Kafka 集群的网络线程数量。 请求队列中允许的请求数。 Kafka 代理的 I/O 线程数量。 在启动时用于日志载入的线程数量,并在关闭时清除。 所有代理的线程池的配置更新可能会在集群级别动态发生。这些更新仅限于当前大小的一半和两倍的当前大小。 Kafka 代理指标可帮助处理所需的线程数量。例如,平均时间网络线程的指标是空闲的(
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
)表示使用的资源百分比。如果有 0% 空闲时间,则使用所有资源,这意味着添加更多线程可能很有用。 如果线程因为磁盘数量而较慢或限制,您可以尝试增加网络请求的缓冲区的大小,以提高吞吐量:# ... replica.socket.receive.buffer.bytes=65536 # ...
另外,增加 Kafka 可以接收的最大字节数:
# ... socket.request.max.bytes=104857600 # ...
6.1.1.5. 为高延迟连接增加带宽
Kafka 批处理数据,以便在从 Kafka 到客户端(如数据中心之间的连接)上实现合理的吞吐量。但是,如果高延迟问题,您可以增加缓冲区的大小来发送和接收信息。
# ... socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 # ...
您可以使用 bandwidth-delay 产品 计算来估算缓冲区的最佳大小,这会乘以带往返延迟(以秒为单位)链路的最大带宽(以秒为单位)以达到最大吞吐量。
6.1.1.6. 使用数据保留策略管理日志
Kafka 使用日志来存储消息数据。日志是与各种索引关联的一系列片段。新消息被写入 active 片段,之后永远不会修改。当服务从消费者获取请求时,可读取片段。有时候,活跃段会 滚动到 变为只读,一个新的活跃段会被创建来替代它。一次只有一个段处于活跃状态。保留旧的片段,直到它们有资格删除。 在代理级别配置设置日志片段的最大大小(以字节为单位),以及活跃的片段前的时间(毫秒):
# ... log.segment.bytes=1073741824 log.roll.ms=604800000 # ...
您可以使用
segment.bytes
和
segment.ms
在主题级别上覆盖这些设置。您需要降低或提升这些值,这取决于删除网段的策略。较大的大小表示活跃片段包含更多信息,且会更频繁地推出。段也有资格减少删除。
您可以设置基于时间的日志保留和清理策略,以便可以保持管理的日志。根据您的要求,您可以使用日志保留配置删除旧的片段。如果使用日志保留策略,则在达到保留限制时会删除非活跃日志片段。删除旧的片段绑定了日志所需的存储空间,因此您不会超过磁盘容量。
对于基于时间的日志保留,您可以根据小时、分钟和毫秒设置保留周期。保留周期基于消息附加到段中的时间。
毫秒配置的优先级超过分钟,其优先级高于小时。默认情况下,minutes 和 milliseconds 配置是 null,但三个选项提供了对您要保留的数据进行大量控制。首选项应提供给毫秒配置,因为它是唯一可以动态更新的三个属性之一。
# ... log.retention.ms=1680000 # ...
如果
log.retention.ms
设置为 -1,则不会将时间限制应用到日志保留,因此所有日志都会被保留。磁盘用量应始终被监控,但通常不建议使用 -1 设置,因为它可能会导致完整磁盘出现问题,这可能会难以重新显示。
对于基于大小的日志保留,您可以以字节为单位设置最大日志大小(日志中所有片段):
# ... log.retention.bytes=1073741824 # ...
换句话说,日志通常会在达到稳定状态后具有大约
log.retention.bytes/log.segment.bytes
片段。当达到最大日志大小时,会删除旧的片段。
使用最大日志大小的潜在问题是,它不会考虑时间信息被附加到段中。您可以对清理策略使用基于时间和大小的日志保留,以获得您需要的平衡。首先达到哪个阈值会触发清理。
如果要在从系统中删除段文件前添加时间延迟,您可以使用
log.segment.delete.delay.ms
用于代理级别或
file.delete.delay.ms
中特定主题。
# ... log.segment.delete.delay.ms=60000 # ...
6.1.1.7. 使用清理策略删除日志数据
删除旧的日志数据的方法由日志 清理 配置决定。 默认情况下,代理启用了 log cleaner:
# ... log.cleaner.enable=true # ...
您可以在主题或代理级别设置清理策略。代理级配置是没有设置策略的主题的默认设置。 您可以设置策略来删除日志、压缩日志或同时执行这两者:
# ... log.cleanup.policy=compact,delete # ...
delete
策略与使用数据保留策略管理日志对应。当不需要永久保留数据时,它非常适合。
紧凑
策略保证为每个消息键保留最新的消息。日志压缩适用于消息值可更改的位置,您希望保留最新的更新。
如果将清理策略设置为删除日志,则根据日志保留限制删除旧的片段。否则,如果没有启用日志清理程序,且没有日志保留限制,日志将继续增长。
如果为日志压缩设置了清理策略,则日志
头
作为标准 Kafka 日志运行,并按顺序写入附加新消息。在紧凑的日志的
尾部
,日志清理程序运行,如果日志中稍后发生具有相同键的另一个记录,将删除记录。具有 null 值的消息也会被删除。如果没有使用密钥,则无法使用压缩,因为需要密钥来识别相关消息。虽然 Kafka 保证保留每个密钥的最新信息,但它不能保证整个压缩的日志不会包含重复。
图 6.1. 在压缩前显示带有偏移位置的键值写入的日志

使用键识别信息,Kafka 压缩会为特定消息键保留最新的消息(具有最高偏移),最终丢弃具有相同键的早期消息。换句话说,其最新状态的消息始终可用,当日志清理程序运行时,该特定消息的任何过时的记录都会最终被删除。您可以将消息恢复到以前的状态。 即使周围记录被删除,记录也会保留其原始偏移量。因此,tail 可以具有非连续偏移。当消耗在 tail 中可用偏移时,会发现带有下一个偏移的记录。
图 6.2. 压缩后日志

如果您只选择紧凑策略,您的日志仍然可以变为任意的。在这种情况下,您可以将策略设置为紧凑 和 删除日志。如果您选择紧凑和删除,首先压缩日志数据,在日志头中使用键删除记录。之后,在日志保留阈值被删除前的数据。
图 6.3. 日志保留点和压缩点

您可以设置检查日志进行清理的频率(以毫秒为单位):
# ... log.retention.check.interval.ms=300000 # ...
调整与日志保留设置相关的日志保留检查间隔。较小的保留大小可能需要更频繁地检查。 清理的频率通常足以管理磁盘空间,但通常不会影响主题的性能。 如果没有要清理的日志,您还可以设置时间(以毫秒为单位),以将清理器放在待机中:
# ... log.cleaner.backoff.ms=15000 # ...
如果您选择删除旧的日志数据,您可以在清除前设置句点来保留已删除的数据:
# ... log.cleaner.delete.retention.ms=86400000 # ...
删除的数据保留周期提供了在数据被不可避免删除前注意到数据的时间。 要删除与特定密钥相关的所有消息,制作者可以发送 tombstone 消息。 tombstone 有一个 null 值,并作为标记来代表消费者删除该值。在压缩后,只保留 tombstone,它必须足够长的时间才能知道该消息已被删除。删除旧的消息时,没有值,tombstone 键也会从分区中删除。
6.1.1.8. 管理磁盘使用率
还有其他与日志清理相关的配置设置,但特别的重要性是内存分配。 deduplication 属性指定在所有日志清理线程中清理的总内存。您可以设置通过缓冲区负载因子使用的内存百分比的上限。
# ... log.cleaner.dedupe.buffer.size=134217728 log.cleaner.io.buffer.load.factor=0.9 # ...
每个日志条目都使用 24 字节,以便您可以处理缓冲区可以在单一运行中处理多少个日志条目,并相应地调整设置。 如果要减少日志清理时间,请考虑增加日志清理线程数量:
# ... log.cleaner.threads=8 # ...
如果您遇到 100% 磁盘带宽使用情况的问题,您可以节流日志清理 I/O,以便读/写操作的总和小于执行操作的磁盘功能的指定双值:
# ... log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 # ...
6.1.1.9. 处理大量消息大小
消息的默认批处理大小为 1MB,这是大多数用例中最大吞吐量的最佳选择。Kafka 可以在减少吞吐量下容纳更大的批处理,假设有足够的磁盘容量。 大型消息大小以四种方式处理: 生产者消息压缩 将压缩消息写入日志。 基于参考的消息传递仅发送对消息值中存储的其他系统中数据的引用。 内联消息传递将信息分成使用相同键的块,然后使用流处理器(如 Kafka Streams)在输出中合并。 构建的 broker 和 producer/consumer 客户端应用程序配置来处理更大的消息大小。 建议使用基于参考的消息和消息压缩选项,并涵盖大多数情况。对于这些选项,必须小心才能避免引入性能问题。
制作者压缩
对于生成者配置,您可以指定一个
compression.type
,如 Gzip,它被应用到制作者生成的数据的批处理。使用代理配置
compression.type=producer
,代理会保留使用制作者的任何压缩。每当制作者和主题压缩不匹配时,代理必须在将批处理附加到日志前再次压缩批处理,这会影响代理性能。
压缩还会在生产者和解压缩开销上增加额外的处理开销,但在批处理中包含更多的数据,因此当消息数据压缩良好时,吞吐量通常很有用。
将生成者压缩与批处理大小的微调相结合,以促进最佳吞吐量。使用指标有助于量化所需的平均批处理大小。
基于参考的消息传递
当您不知道消息有多大时,基于参考的消息传递对于数据复制非常有用。外部数据存储必须快速、持久且高度可用,才能使此配置正常工作。数据被写入数据存储,并返回对数据的引用。producer 发送一条消息,其中包含对 Kafka 的引用。消费者从消息中获取引用,并使用它来从数据存储中获取数据。
图 6.4. 基于参考的消息传递流

当消息传递需要更多行程时,端到端延迟会增加。这个方法的另一个显著缺陷是,在清理 Kafka 消息时,外部系统中没有自动清理数据。混合方法是仅将大型消息发送到数据存储并直接处理标准化消息。
内联消息传递
内联消息传递非常复杂,但它对基于参考的消息等外部系统没有开销。
如果消息太大,则生成客户端应用必须序列化,然后对数据进行阻塞。然后,生成者使用 Kafka
ByteArraySerializer
,或者在发送前再次序列化每个块。消费者跟踪消息和缓冲区块,直到它有完整的消息。消耗客户端应用程序接收块,这些块在进行序列化前被编译。根据每个块消息集合的第一个或最后一个块的偏移量,将完整消息发送到消耗应用程序的剩余部分。针对偏移元数据检查成功交付完整消息,以避免重新平衡期间重复。
图 6.5. 内联消息传递流

内联消息传递在消费者端具有性能开销,因为需要缓冲,特别是在并行处理一系列大型消息时。大型消息的块可能会变为交集,因此如果缓冲区中另一个大消息的块不完整,则无法提交消息的所有区块。因此,通常通过持久保留消息块或实施提交逻辑来支持缓冲。
配置以处理更大的信息
如果无法避免更大的消息,并且避免在消息流的任意点进行块,您可以增加消息限制。为此,请在主题级别上配置
message.max.bytes
,以设置各个主题的最大记录批处理大小。如果您在代理级别上设置
message.max.bytes
,则为所有主题允许更大的消息。
代理将拒绝任何大于
message.max.bytes
设置的限制的消息。producers 的缓冲区大小(
max.request.size
)和消费者(
message.max.bytes
)必须能够容纳更大的消息。
6.1.1.10. 控制消息数据的日志清除
log flush 属性控制缓存的消息数据定期写入磁盘。调度程序以毫秒为单位指定日志缓存上的检查频率:
# ... log.flush.scheduler.interval.ms=2000 # ...
您可以根据消息保留在内存中的最长时间以及日志中的最大信息数量来控制清除的频率,然后再写入磁盘:
# ... log.flush.interval.ms=50000 log.flush.interval.messages=100000 # ...
flushes 之间的等待时间包括进行检查的时间以及执行刷新前指定的间隔。增加清除的频率可能会影响吞吐量。 通常,建议不要设置显式清除阈值,并让操作系统使用默认设置执行后台清除。分区复制提供比写入任何单个磁盘更高的数据持久性,因为失败的代理可以从其同步的副本中恢复。 如果您使用应用程序清除管理,如果您正在使用更快速的磁盘,则设置较低冲刷阈值可能适当。
6.1.1.11. 分区重新平衡以实现可用性
分区可以在代理之间复制,以进行容错。对于给定分区,一个代理被选为 leader,并处理所有生成请求(写入日志)。在领导失败时,在其他代理中,分区会遵循其他代理为数据可靠性复制分区数据。
followers 通常不会提供客户端,但
broker.rack
允许消费者在 Kafka 集群跨越多个数据中心时消耗来自最接近的副本的消息。followers 仅操作从分区领导机复制消息,并允许在领导机失败时进行恢复。恢复需要同步的后续程序。遵循者通过向领导发送获取请求来保持同步,这将按顺序将消息返回到后续消息。如果已在领导消息中发现最近提交的消息,则后续者被视为同步。领导机通过查看后续程序请求的最后一个偏移来检查。不同步的后续者通常不符合领导机失败,
除非允许未清理的领导选举机制
。
您可以在考虑不同步前调整滞后时间:
# ... replica.lag.time.max.ms=30000 # ...
Lag time 对时间设置了一个上限,以将消息复制到所有同步副本以及生成者必须等待确认的时间。如果后续程序无法获取请求,并捕获指定滞后时间的最新消息,则会从同步的副本中删除。您可以更快地减少检测失败的副本的时间,但这样做可能会增加无法同步的后续者数量。正确的滞后时间值取决于网络延迟和代理磁盘带宽。
当领导分区不再可用时,会选择其中一个同步副本作为新的领导。分区副本列表中的第一个代理称为
首选
领导。默认情况下,根据定期检查领导发行版,为自动分区领导重新平衡启用 Kafka。也就是说,Kafka 会检查首选领导是否为
当前的
领导。重新平衡可确保领导在代理和代理间均匀分布,不会超载。
您可以使用 Cruise Control for AMQ Streams
将副本分配找出在集群中平均平衡负载的代理。其计算需要考虑领导和后续者所经历的不同负载。失败的领导会影响 Kafka 集群的平衡,因为剩余的代理会获得领导额外分区的额外工作。
对于 Cruise Control 发现的分配,实际上,分区需要被首选领导。Kafka 可以自动确保首选领导(在可能的情况下),根据需要更改当前的领导。这样可确保集群处于 Cruise Control 找到的均衡状态。
您可以在重新平衡检查前控制重新平衡检查的频率,以及代理允许的最大 imbalance 百分比。
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
代理的百分比 leader imbalance 是当前代理为当前领导的分区数量和首选领导的分区数量之间的比例。您可以将百分比设置为 0,以确保首选领导一直被选择,假设它们处于同步状态。
如果检查重新平衡需要更多控制,您可以禁用自动重新平衡。然后,您可以选择何时使用
kafka-leader-election.sh
命令行工具触发重新平衡。
由 AMQ Streams 提供的 Grafana 仪表板显示没有活跃领导的分区和分区的指标。
6.1.1.12. unclean 领导选举机制
对同步副本的领导选举被视为干净,因为它不会丢失数据。这是默认发生的情况。但是,如果没有同步的副本在领导方面需要什么?或许,ISR (同步副本)仅在领导磁盘结束时包含领导机。如果没有设置最少的 in-sync 副本数量,并且硬盘驱动器无法同步分区领导机时,数据将会丢失。不仅如此, 新的领导产品也无法被选举 ,因为没有同步的跟随者。 您可以配置 Kafka 如何处理领导失败:
# ... unclean.leader.election.enable=false # ...
默认情况下,不干净的领导选举机制被禁用,这意味着不同步的副本无法成为领导。使用干净的领导选举机制时,如果在旧领导丢失时没有其他代理,则 Kafka 会在消息写入或读取前等待该领导机在线。不干净的领导选举机制意味着不同步的副本可能会成为领导机,但您会面临丢失消息的风险。您做出的选择取决于您的要求是否优先使用可用性或持久性。 您可以在主题级别覆盖特定主题的默认配置。如果您无法承担数据丢失的风险,请保留默认配置。
6.1.1.13. 避免不必要的消费者组重新平衡
对于加入新消费者组的用户,您可以添加一个延迟,以避免对代理进行不必要的重新平衡:
# ... group.initial.rebalance.delay.ms=3000 # ...
延迟是协调器等待成员加入的时间长度。延迟时间越长,所有成员都将一次加入并避免重新平衡的可能性。但是,延迟也会阻止组消耗到周期结束为止。
6.1.2. Kafka producer 配置调整
使用带有针对特定用例量身定制的可选属性的基本制作者配置。 调整配置以最大化吞吐量可能会增加延迟,反之亦然。您需要对生成者配置进行试验和调优,以获得所需的平衡。
6.1.2.1. 基本制作者配置
每个制作者都需要 connection 和 serializer 属性。通常,最好为跟踪添加客户端 ID,并使用生成者压缩来减小请求的批处理大小。 在基本制作者配置中: 无法保证分区中的消息顺序。 到达代理的消息不能保证持久性。