使用场景:
某些时候,当几个topic生产者突发产生大量消息时,会造成磁盘空间紧张,这时,除了增加磁盘,另一个方法就是修改配置文件,将日志的保存时间修改小一点,但这两种方案,都必须停机和重启kafka,显然,这在生产集群上,是不能这么处理的。这里,可以通过在线修改单个topic的配置,以覆盖默认配置,临时解决磁盘空间紧张的问题。
特点
在线修改,不需要重启和停机,但是只能对单个 topic 进行修改,只能按照 ms 修改
修改后,新的配置会在
log.retention.check.interval.ms
时间内被检查并应用到整个集群,该值默认为 300 秒
kafka-config 命令
kafka-config 相对于 kafka-topics 而言, 在配置修改方面做得更全面,用户可以指定 config 的 对象 : topic, client, user or broker.
kafka-configs.sh --help 查看帮助
[tomcat@kafka-test ~]$ kafka-configs.sh --help
This tool helps to manipulate and describe entity config for a topic, client, user or broker
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers':
log.message.timestamp.type
ssl.client.auth
log.retention.ms
sasl.login.refresh.window.jitter
sasl.kerberos.ticket.renew.window.
factor
log.preallocate
log.index.size.max.bytes
sasl.login.refresh.window.factor
ssl.truststore.type
ssl.keymanager.algorithm
log.cleaner.io.buffer.load.factor
sasl.login.refresh.min.period.seconds
ssl.key.password
background.threads
log.retention.bytes
ssl.trustmanager.algorithm
log.segment.bytes
max.connections.per.ip.overrides
log.cleaner.delete.retention.ms
log.segment.delete.delay.ms
min.insync.replicas
ssl.keystore.location
ssl.cipher.suites
log.roll.jitter.ms
log.cleaner.backoff.ms
sasl.jaas.config
principal.builder.class
log.flush.interval.ms
log.cleaner.max.compaction.lag.ms
max.connections
log.cleaner.dedupe.buffer.size
log.flush.interval.messages
advertised.listeners
num.io.threads
listener.security.protocol.map
log.message.downconversion.enable
sasl.enabled.mechanisms
sasl.login.refresh.buffer.seconds
ssl.truststore.password
listeners
metric.reporters
ssl.protocol
sasl.kerberos.ticket.renew.jitter
ssl.keystore.password
sasl.mechanism.inter.broker.protocol
log.cleanup.policy
sasl.kerberos.principal.to.local.rules
sasl.kerberos.min.time.before.relogin
num.recovery.threads.per.data.dir
log.cleaner.io.max.bytes.per.second
log.roll.ms
ssl.endpoint.identification.algorithm
unclean.leader.election.enable
message.max.bytes
log.cleaner.threads
log.cleaner.io.buffer.size
max.connections.per.ip
sasl.kerberos.service.name
ssl.provider
follower.replication.throttled.rate
log.index.interval.bytes
log.cleaner.min.compaction.lag.ms
log.message.timestamp.difference.max.
ssl.enabled.protocols
log.cleaner.min.cleanable.ratio
replica.alter.log.dirs.io.max.bytes.
per.second
ssl.keystore.type
ssl.secure.random.implementation
ssl.truststore.location
sasl.kerberos.kinit.cmd
leader.replication.throttled.rate
num.network.threads
compression.type
num.replica.fetchers
For entity-type 'users':
request_percentage
producer_byte_rate
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
For entity-type 'clients':
request_percentage
producer_byte_rate
consumer_byte_rate
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
--alter Alter the configuration for the entity.
--bootstrap-server <String: server to The Kafka server to connect to. This
connect to> is required for describing and
altering broker configs.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers (applies to
corresponding entity type in command
line)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker id)
--entity-type <String> Type of entity
(topics/clients/users/brokers)
--force Suppress console prompts
--help Print usage information.
--version Display Kafka version.
--zookeeper <String: urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
[tomcat@kafka-test ~]$
修改 topic 的参数:
可在创建topic时,通过--config进行指定项的参数配置,覆盖默认配置:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
也可以在创建topic之后通过config.sh文件对其中的特定指标进行修改,下面操作对my-topic相关指标进行配置:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=128000
查看是否修改成功:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
也可以撤销/删除某些指定配置,将该项重置为默认配置:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
修改 broker 的参数:
改变当前broker 0上的log cleaner threads可以通过下面命令实现:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
查看当前broker 0的动态配置参数:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
删除broker id为0的server上的配置参数/设置为默认值:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
同时更新集群上所有broker上的参数(cluster-wide类型,保持所有brokers上参数的一致性):
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
查看当前集群中动态的cluster-wide类型的参数列表:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
kafka-topics.sh 命令
老版本的 kafka 使用 kafka-topics.sh 修改,新版本这种方式可能会被弃用
#1,查看当前topic配置
./kafka-topics.sh --describe --topic my_test_topic --zookeeper test.myzk.com:2181/kafkacluster
#2,调整topic配置
./kafka-topics.sh --topic my_test_topic --zookeeper test.myzk.com:2181/kafkacluster --alter --config retention.ms=43200000
# 时长毫秒 43200000ms=12h
#3,检查修改的配置是否生效
同第一步,查看输出的第一行,类似如下:
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,delete.retention.ms=86400000,retention.ms=43200000,cleanup.policy=delete,compression.type=producer
其他可选的调整参数:
segment.bytes=104857600
#单个日志文件大小,默认1G
delete.retention.ms=86400000
#对于压缩日志保留的最长时间,也是客户端消费消息的最长时间,与retention.ms的区别在于一个控制未压缩数据,一个控制压缩后的数据
retention.ms=86400000
#如果使用“delete”的retention策略,这项配置就是指删除日志前日志保存的时间
cleanup.policy=delete
#默认方式 delete 将会丢弃旧的部分 compact 将会进行日志压缩
compression.type=producer
#压缩类型,此配置接受标准压缩编码 gzip, snappy, lz4 ,另外接受 uncompressed 相当于不压缩, producer 意味着压缩类型由producer指定
./zookeeper-shell.sh test.myzk.com:2181/kafkacluster
#查看zk中kafka集群信息
85后,大专学历,中原人士,家里没矿。
由于年轻时长的比较帅气,导致在别人眼里,我一直不谈恋爱的原因是清高,实则是自己的小自卑。最大的人生目标就是找一个相知相爱相容的人,共度余生。
和人相处时如果能感受到真诚,会非常注重彼此的关系,对别人没有什么心机,即使有利益冲突,一般也会以和为贵,因为在这个世界上,物质的东西,从来不会吸引到我。
特别迷恋那些大山大水,如果现在还能隐居,可能早就去了。对那些宏伟的有底蕴的人文景观比较不感冒。
从事于IT行业,却一直对厨房念念不忘,由于身材魁梧,总觉得自己上辈子是个将军,可惜这辈子没当兵,也不会打架。