添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

从官网下载包文件后,可以直接在本地启动一个实例:

# 先启动 zookeeper 服务
kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties
# 然后启动 Kafka server
kafka/bin/kafka-server-start.sh kafka/config/server.properties 

Kafka 使用测试

Kafka 安装包提供了一些测试用的 shell 脚本,可以直接使用

# 创建一个 Topic
kafka/bin/kafka-topics.sh  --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查看已经创建的 Topic
kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
# 创建生产者,运行下面命令后,直接在 cmd 敲内容 + Enter
kafka/bin/kafka-console-producer.sh  --broker-list localhost:9092 --topic test
# 创建消费者
kafka/bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic  test --from-beginning 

配置 Kafka Server SASL/PLAIN 认证

配置认证好像比较复杂,网上的文章以及官方文档,写的内容都很多,理解起来费劲,最后明白只需要看这里的配置就行: https://kafka.apache.org/documentation/#security_sasl_plain

首先创建配置文件 kafka/config/kafka_server_jaas.conf

cd kafka_2.12-2.8.0/config
vim kafka_server_jaas.conf
# 内容如下
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpass"
    user_admin="adminpass"
    user_test="testpass";
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpass"
    user_admin="adminpass"
    user_test="testpass";
Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpass"
    user_admin="adminpass";
Server {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpass"
    user_admin="adminpass";

这里定义用户的方式非常奇葩,很难理解。username 和 password 两字段定义 Kafka brokers 内部沟通的用户密码,user_用户名 配置的是 client 连接 broker 时用的用户、密码。据我尝试,必须定义一个 username 用户对应的 user_ 字段,否则连不上。就像上面,有个 username=”testuser” ,所以必须再定义一次 user_testuser 且密码保持一致。此外可以再添加新用户,如添加 user_alice=”alice-secret” 。

KafkaServer 和 KafkaClient 是 Kafka Brokers 之间,以及 Kafka 客户端通讯的配置,Server 和 Client 是 Kafka 与 Zookeeper 之间的配置。注意这里和 Zookeeper 配置的 jaas 有点不一样,Kafka 这里是 org.apache.kafka.common.security.plain.PlainLoginModule ,Zookeeper 是 org.apache.zookeeper.server.auth.DigestLoginModule 。

然后编辑启动脚本 kafka/bin/kafka-server-start.sh ,在后最添加一行 export 语句

export KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

再编辑配置文件 kafka/config/server.properties

listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
# 参数 allow.everyone.if.no.acl.found
# 设置为 true,ACL 机制改为黑名单机制,只有黑名单中的用户无法访问
# 设置为 false,ACL 机制改为白名单机制,只有白名单中的用户可以访问,默认值为 false
auto.create.topics.enable=false
super.users=User:admin
zookeeper.set.acl=true

最后再重新启动 kafka 就可以了。我们在 jaas 文件中配置了两用户,admin 和 test,其中我们设置了 admin 为超级用户。后面我们创建 topic 和 ACL 授权都将以 admin 用户来操作,而 test 用户只有读写 topic 权限,无法删除 topic 。

配置 Zookeeper

由于 Kafka 的 metadata 数据是保存在 zookeeper 中的,所以需要设置 zookeeper 支持 SASL 验证,然后配置权限,禁止未登录用户随便删除 Topic 等。

编辑配置文件 zookeeper/conf/zoo.cfg

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

其中 authProvider.1 这里的 .1 是 server_id ,多个 zookeeper 节点每个都要配置,例如你有 3 台 zk,那么需要再加上 authProvider.2 和 authProvider.3 等。

添加 jaas 配置文件 zookeeper/conf/zookeeper_jaas.conf

Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
    user_admin="adminpass";
Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
    user_admin="adminpass";

修改 zookeeper/bin/zkEnv.sh ,添加一行

export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOBINDIR}/../conf/zookeeper_jaas.conf $SERVER_JVMFLAGS"

这里因为我是用的单独的 Zookeeper 程序包,如果你用的是 Kafka 自带的 Zookeeper,那 SERVER_JVMFLAGS 要改成 KAFKA_OPTS 。滚动重启 Zookeeper 。

配置 zkCli.sh 使用上鉴权。创建配置文件 zookeeper/conf/adminclient_jaas.conf

Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="adminpass";

然后修改 zookeeper/bin/zkCli.sh ,在 java 后面添加

"$JAVA" "-Djava.security.auth.login.config=${ZOOBIN}/../conf/adminclient_jaas.conf" 后面接上原来的参数

配置 Kafka ACL 权限

先修改 Kafka 里的 kafka/bin/zookeeper-security-migration.sh 文件,添加

export KAFKA_OPTS="-Djava.security.auth.login.config=/data/release/dp_kafka/config/kafka_server_jaas.conf"

然后运行命令 ,参考这里的文档

kafka/bin/zookeeper-security-migration.sh --zookeeper.connect 127.0.0.1:2181 --zookeeper.acl secure

这样 Zookeeper 里面的 metadata 都加上权限了。可以上 Zookeeper 验证下

zookeeper/bin/zkCli.sh
# 登录进 shell 后,执行 getAcl 指令
getAcl /brokers/topics
'sasl,'admin
: cdrwa
'world,'anyone

为了用上 Kafka 自带的 shell 工具,我们要配置 jaas 认证,新建一个 kafka/config/adminclient-configs.conf

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
    required username="admin" password="adminpass";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --list

一些 ACL 命令使用,参考文档

# 列出 temp 这个 topic 的 ACL 列表
kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --list --topic "temp"
# 对 temp 这个 topic 对用户 test 授权
kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --add --allow-principal User:test --operation Read --operation Write --operation AlterConfigs --operation Describe --operation DescribeConfigs --operation Alter --topic "temp"
# 去掉 delete 权限
kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --remove --allow-principal User:test --operation Delete --topic "temp"

配置 Consumer 和 Producer

如果 Kafka 配置了认证,再用脚本消费数据就会报错

./bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic  test --from-beginning 
WARN [Consumer clientId=consumer-console-consumer-93884-1, groupId=console-consumer-93884] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
# 以下是 Kafka Server 端报错日志
INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

修改 config/consumer.properties ,添加以下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="adminpass";

config/producer.properties 参考上面一样的修改。

最后再尝试运行上面的命令。

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --consumer.config config/consumer.properties --topic  test --from-beginning 
./bin/kafka-console-producer.sh  --broker-list localhost:9092 --topic test \
  --producer.config=config/producer.properties
./bin/kafka-topics.sh --list --bootstrap-server=localhost:9092 \
  --command-config config/consumer.properties 

Python 端添加认证代码如下

from kafka import KafkaConsumer
consumer = KafkaConsumer('topic_name', group_id='test', bootstrap_servers=["127.0.0.1:9092"],
                             auto_offset_reset='earliest', enable_auto_commit=False,
                           security_protocol= 'SASL_PLAINTEXT',
                           sasl_mechanism= 'PLAIN',
                           sasl_plain_username= 'testuser',
                           sasl_plain_password= 'testpass',
for message in consumer:
        print(message)

Java 端认证代码如下

import org.apache.kafka.clients.consumer.KafkaConsumer;
String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, "testuser", "testpass");
props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", jaasCfg);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

配置管理界面 Kowl

这里使用 https://github.com/cloudhut/kowl 作为 Web 管理工具,参考项目里的 Dockerfile,可以直接从源码编译运行

git clone https://github.com/cloudhut/kowl.git
cd kowl/backend 
go build -o ./bin/kowl ./cmd/api/
# 可执行文件在 ./kowl/backend/bin/kowl
cp ./kowl/backend/bin/kowl ./kowl/
# 编译前端
cd kowl/frontend
npm install && npm run build
cp -r build ./kowl
# 创建配置文件
cd kowl && wget https://github.com/cloudhut/kowl/blob/master/docs/config/kowl.yaml
# 运行 kowl
./kowl -config.filepath kowl.yaml
# 最终目录结构是,kowl 可执行文件与前端 build 文件夹在一起

kowl 示例配置

kafka:
  brokers:
    - 127.0.0.1:9092
  # clientId: kowl
  # rackId: # In multi zone Kafka clusters you can reduce traffic costs by consuming messages from replica brokers in the same zone
  sasl:
    enabled: true
    username: testuser
    password: testpass # This can be set via the --kafka.sasl.password flag as well
    mechanism: PLAIN # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER and AWS_MSK_IAM are supported

已经加上鉴权的 zookeeper 去掉鉴权

如果对已经加上鉴权的 zookeeper、kafka 集群,现在不想要鉴权了,要回退回去,操作如下。

首先运行命令,把 kafka 在 zookeeper 里面的 metadata 数据去掉认证:

kafka/bin/zookeeper-security-migration.sh --zookeeper.connect 127.0.0.1:2181 --zookeeper.acl unsecure

去掉后,可以上 zookeeper 使用 getAcl 命令确认。

然后,修改 zookeeper 文件 zookeeper/bin/zkEnv.sh ,去掉之前加的

# 把这一行注释掉
# export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOBINDIR}/../conf/zookeeper_jaas.conf $SERVER_JVMFLAGS"

滚动重启 zookeeper。

再来修改 kafka 配置,修改 kafka/config/kafka_server_jaas.conf ,删除连接 zookeeper 的配置 Client 和 Server 。再修改 kafka/config/server.properties ,删除配置项 zookeeper.set.acl=true 。

再滚动重启 kafka 。

本条目发布于。属于技术文章分类。作者是