3.2 Topic配置
与topic相关的配置,服务器的默认值,也可可选择的覆盖指定的topic。如果没有给出指定topic的配置,则将使用服务器默认值。 可以通过-config选项在topic创建时设置。此示例使用自定义最大消息的大小和刷新率,创建一个名为
my-topic
的topic:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
也可以使用
alter configs
命令修改或设置。 此示例修改更新
my-topic
的最大的消息大小:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
--alter --add-config max.message.bytes=128000
你可以执行以下命令验证结果
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
以下是topic级配置。是服务器的默认配置。服务器默认配置值仅适用于topic。
服务器默认属性
cleanup.policy
“delete”或“compact”。指定在旧的日志段的保留策略。默认策略(“delete”),将达到保留时间或大小限制的日志废弃。 “compact”则压缩日志。
delete
[compact, delete]
log.cleanup.policy
medium
compression.type
针对指定的topic设置最终的压缩方式。标准的压缩格式有'gzip', 'snappy', lz4。还可以设置'uncompressed',就是不压缩;设置为'producer'这意味着保留生产者设置的原始压缩编解码。
string
producer
[uncompressed, snappy, lz4, gzip, producer]
compression.type
medium
delete.retention.ms
保留删除消息压缩topic的删除标记的时间。此设置还给出消费者如果从offset 0开始读取并确保获得最终阶段的有效快照的时间范围(否则,在完成扫描之前可能已经回收了)。
86400000
[0,...]
log.cleaner.delete.retention.ms
medium
file.delete.delay.ms
从文件系统中删除文件之前等待的时间
60000
[0,...]
log.segment.delete.delay.ms
medium
flush.messages
此设置允许指定我们强制fsync写入日志的数据的间隔。例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。一般,我们建议不要设置它,使用复制特性来保持持久性,并允许操作系统的后台刷新功能更高效。可以在每个topic的基础上覆盖此设置(请参阅每个主题的配置部分)。
medium
9223372036854775807
[0,...]
log.flush.interval.messages
flush.ms
此设置允许我们强制fsync写入日志的数据的时间间隔。例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。 一般,我们建议不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能,因为它更有效率
9223372036854775807
[0,...]
log.flush.interval.ms
medium
follower.replication.throttled.replicas
follower复制限流列表。该列表应以[PartitionId]的形式描述一组副本:[BrokerId],[PartitionId]:[BrokerId]:...或者通配符'*'可用于限制此topic的所有副本。
[partitionId],[brokerId]:[partitionId],[brokerId]:...
follower.replication.throttled.replicas
medium
index.interval.bytes
此设置控制Kafka向其offset索引添加索引条目的频率。默认设置确保我们大致每4096个字节索引消息。 更多的索引允许读取更接近日志中的确切位置,但使索引更大。你不需要改变这个值。
[0,...]
log.index.interval.bytes
medium
leader.replication.throttled.replicas
在leader方面进行限制的副本列表。该列表设置以[PartitionId]的形式描述限制副本:[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...或使用通配符‘*’限制该topic的所有副本。
[partitionId],[brokerId]:[partitionId],[brokerId]:...
leader.replication.throttled.replicas
medium
max.message.bytes
kafka允许的最大的消息批次大小。如果增加此值,并且消费者的版本比0.10.2老,那么消费者的提取的大小也必须增加,以便他们可以获取大的消息批次。
在最新的消息格式版本中,消息总是分组批量来提高效率。在之前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个消息。
1000012
[0,...]
message.max.bytes
medium
message.format.version
指定消息附加到日志的消息格式版本。该值应该是一个有效的ApiVersion。例如:0.8.2, 0.9.0.0, 0.10.0,更多细节检查ApiVersion。通过设置特定的消息格式版本,并且磁盘上的所有现有消息都小于或等于指定版本。不正确地设置此值将导致消费者使用旧版本,因为他们将接收到“不认识”的格式的消息。
string
0.11.0-IV2
log.message.format.version
medium
min.cleanable.dirty.ratio
此配置控制日志压缩程序将尝试清除日志的频率(假设启用了日志压缩)。默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。
double
[0,...,1]
log.cleaner.min.cleanable.ratio
medium
min.compaction.lag.ms
消息在日志中保持不压缩的最短时间。仅适用于正在压缩的日志。
[0,...]
log.cleaner.min.compaction.lag.ms
medium
min.insync.replicas
当生产者设置应答为"all"(或“-1”)时,此配置指定了成功写入的副本应答的最小数。如果没满足此最小数,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)
当min.insync.replicas和acks强制更大的耐用性时。典型的情况是创建一个副本为3的topic,将min.insync.replicas设置为2,并设置acks为“all”。如果多数副本没有收到写入,这将确保生产者引发异常。
[1,...]
min.insync.replicas
medium
preallocate
如果我们在创建新的日志段时在磁盘上预分配该文件,那么设为True。
boolean
false
log.preallocate
medium
retention.bytes
如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧的日志段以释放空间。默认情况下,没有设置大小限制则仅限于时间限制。
log.retention.bytes
medium
retention.ms
如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。这代表SLA消费者必须读取数据的时间长度。
604800000
log.retention.ms
medium
segment.bytes
此配置控制日志的段文件大小。一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。
1073741824
[14,...]
log.segment.bytes
medium
segment.index.bytes
此配置控制offset映射到文件位置的索引的大小。我们预先分配此索引文件,并在日志滚动后收缩它。通常不需要更改此设置。
10485760
[0,...]
log.index.size.max.bytes
medium
segment.jitter.ms
从计划的段滚动时间减去最大随机抖动,以避免异常的段滚动
[0,...]
log.roll.jitter.ms
medium
segment.ms
此配置控制Kafka强制日志滚动的时间段,以确保保留可以删除或压缩旧数据,即使段文件未满。
604800000
[0,...]
log.roll.ms
medium
unclean.leader.election.enable
是否将不在ISR中的副本作为最后的手段选举为leader,即使这样做可能会导致数据丢失。
boolean
false
unclean.leader.election.enable
medium
kafka新版本新增的配置
kafka > 2.0
服务器默认属性
message.downconversion.enable
此配置控制是否启用消息格式的向下转换以满足消费请求。当设置为false时,broker不会对期待旧消息格式的消费者执行向下转换。broker 会对来自此类旧客户端的消费请求作出 UNSUPPORTED_VERSION
错误响应。这个配置不适用于复制到followers时可能需要的任何消息格式转换。
boolean
false
log.message.downconversion.enable