AclAuthorizer
使用访问控制列表(ACL),后者定义一组规则描述用户可以和无法执行的操作。
此流程描述了如何在 Kafka 代理中使用
AclAuthorizer
插件时添加 ACL 规则。
规则使用
kafka-acls.sh
实用程序添加,并存储在 ZooKeeper 中。
AMQ Streams 安装
在所有用作 Kafka 代理的主机上。
在 Kafka 代理
中启用了
授权。
使用
--add
选项运行
kafka-acls.sh
。
允许使用
MyConsumerGroup
consumer 组从
myTopic
中读取
user1
和
user2
访问权限。
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
拒绝
user1
从 IP 地址主机
127.0.0.1
读取
myTopic
的访问权限。
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
添加
user1
作为带有
MyConsumerGroup
的
myTopic
的消费者。
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
此流程描述了如何在 Kafka 代理中使用
AclAuthorizer
插件时删除 ACL 规则。
使用
kafka-acls.sh
实用程序删除规则。
AMQ Streams 安装
在所有用作 Kafka 代理的主机上。
在 Kafka 代理
中启用了
授权。
添加了
ACL。
使用
--remove
选项运行
kafka-acls.sh
。
删除 ACL,允许
user1
和
user2
使用
MyConsumerGroup
消费者组从
myTopic
读取。
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
删除 ACL 添加
user1
作为带有
MyConsumerGroup
的
myTopic
的消费者。
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
移除 ACL 拒绝
user1
访问,以从 IP 地址主机
127.0.0.1
读取
myTopic
。
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
在 Kafka 和 ZooKeeper 间启用身份验证时,您可以使用 ZooKeeper Access Control List (ACL)规则自动控制对 ZooKeeper 中存储的 Kafka 元数据的访问。
ZooKeeper ACL 规则的执行由
config/server.properties
Kafka 配置文件中的
zookeeper.set.acl
属性控制。
默认情况下,这个属性会被默认禁用,并通过设置为
true
来启用:
zookeeper.set.acl=true
如果启用了 ACL 规则,当在 ZooKeeper 中创建
znode
时,只有创建它的 Kafka 用户才可以修改或删除它。所有其他用户具有只读访问权限。
Kafka 仅为新创建的 ZooKeeper
znodes
设置 ACL 规则。如果仅在集群首次启动后启用 ACL,
zookeeper-security-migration.sh
工具可以在所有现有
znodes
上设置 ACL。
5.4.7.2. 为新的 Kafka 集群启用 ZooKeeper ACL
此流程描述了如何在新 Kafka 集群在 Kafka 配置中启用 ZooKeeper ACL。仅在 Kafka 集群首次启动前使用这个步骤。有关在已经运行的集群中启用 ZooKeeper ACL,请参阅
第 5.4.7.3 节 “在现有 Kafka 集群中启用 ZooKeeper ACL”
。
AMQ Streams
安装
在所有将用作 Kafka 代理的主机上。
zookeeper 集群已
配置并运行
。
在 ZooKeeper
中启用了
客户端到服务器身份验证。
在 Kafka 代理
中启用 z
ookeeper 身份验证。
Kafka 代理还没有启动。
编辑
/opt/kafka/config/server.properties
Kafka 配置文件,在所有集群节点上将
zookeeper.set.acl
字段设置为
true
。
zookeeper.set.acl=true
启动 Kafka 代理。
5.4.7.3. 在现有 Kafka 集群中启用 ZooKeeper ACL
此流程描述了如何在 Kafka 配置中启用运行 Kafka 集群的 ZooKeeper ACL。使用
zookeeper-security-migration.sh
工具,在所有存在的
znodes
中设置 ZooKeeper ACL。
zookeeper-security-migration.sh
可作为 AMQ Streams 的一部分,并可在
bin
目录中找到。
Kafka 集群已
配置并运行
。
启用 ZooKeeper ACL
-
编辑
/opt/kafka/config/server.properties
Kafka 配置文件,在所有集群节点上将
zookeeper.set.acl
字段设置为
true
。
zookeeper.set.acl=true
-
重启所有 Kafka 代理。
有关在多节点集群中重启代理的详情,请参考
第 3.3 节 “执行安全滚动重启 Kafka 代理”
。
使用
zookeeper-security-migration.sh
工具,在所有现有的 ZooKeeper
znodes
上设置 ACL。
su - kafka
cd /opt/kafka
KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=<ZooKeeperURL>
su - kafka
cd /opt/kafka
KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=zoo1.my-domain.com:2181
exit
AMQ Streams 支持加密和验证,它被配置为监听程序配置的一部分。
Kafka 代理中的加密和身份验证是为每个监听程序配置。有关 Kafka 侦听器配置的更多信息,请参阅
第 5.4.2 节 “监听器”
。
Kafka 代理中的每个监听程序都使用自己的安全协议进行配置。配置属性
监听程序.security.protocol.map
定义哪个监听程序使用哪个安全协议。它将每个侦听器名称映射到其安全协议。支持的安全协议有:
-
PLAINTEXT
-
无加密或身份验证的监听程序。
使用 TLS 加密(可选)使用 TLS 客户端证书进行身份验证的监听程序。
-
SASL_PLAINTEXT
-
无加密功能的监听程序,但使用基于 SASL 的身份验证。
-
SASL_SSL
-
基于 TLS 的加密和 SASL 身份验证的监听程序。
根据以下
监听器
配置:
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
listener.security.protocol.map
可能如下所示:
listener.security.protocol.map=INT1:SASL_PLAINTEXT,INT2:SASL_SSL,REPLICATION:SSL
这会将侦听器
INT1
配置为使用 SASL 身份验证的未加密的连接,
INT2
使用 SASL 身份验证的加密连接,
REPLICATION
接口使用 TLS 加密(可能与 TLS 客户端身份验证一起使用)。相同的安全协议可多次使用。以下示例也是有效的配置:
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL
此类配置为所有接口使用 TLS 加密和 TLS 身份验证。下面的章节将更详细地阐述如何配置 TLS 和 SASL。
Kafka 支持 TLS 来加密与 Kafka 客户端的通信。
为了使用 TLS 加密和解密,必须提供含有私钥和公钥的密钥存储。这通常使用 Java 密钥存储(JKS)格式的文件来完成。
ssl.keystore.location
属性中设置此文件的路径。
ssl.keystore.password
属性应当用于设置保护密钥存储的密码。例如:
ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456
在某些情况下,使用额外的密码来保护私钥。任何此类密码都可使用
ssl.key.password
属性进行设置。
Kafka 可以使用由证书颁发机构签名的密钥和自签名密钥。使用证书颁发机构签名的密钥应始终是首选的方法。为了允许客户端验证其正在连接的 Kafka 代理的身份,证书应始终包含公告的主机名 (CN) 或 Subject Alternative Name(SAN)。
对于不同的监听器,可以使用不同的 SSL 配置。以
ssl 开头的所有选项均可使用
listener.name.<NameOfTheListener>.
前缀,其中监听器的名称必须始终处于小写状态。这将覆盖该特定监听程序的默认 SSL 配置。以下示例演示了如何为不同的监听程序使用不同的 SSL 配置:
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL
# Default configuration - will be used for listeners INT1 and INT2
ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456
# Different configuration for listener REPLICATION
listener.name.replication.ssl.keystore.location=/path/to/keystore/server-1.jks
listener.name.replication.ssl.keystore.password=123456
-
ssl.cipher.suites
-
启用的密码套件列表。每个密码套件都是用于 TLS 连接的身份验证、加密、MAC 和密钥交换算法的组合。默认情况下启用所有可用的加密套件。
-
ssl.enabled.protocols
-
启用的 TLS/SSL 协议列表。默认为
TLSv1.2,TLSv1.1,TLSv1
。
这个步骤描述了如何在 Kafka 代理中启用加密。
AMQ Streams
安装
在所有将用作 Kafka 代理的主机上。
为集群中的所有 Kafka 代理生成 TLS 证书。证书应在其通用名称或主题备用名称中公告和 bootstrap 地址。
编辑所有集群节点上的
/opt/kafka/config/server.properties
Kafka 配置文件:
更改
监听程序.security.protocol.map
字段,以指定您要使用 TLS 加密的监听程序的
SSL
协议。
使用代理证书将
ssl.keystore.location
选项设置为 JKS 密钥存储的路径。
将
ssl.keystore.password
选项设置为用于保护密钥存储的密码。
listeners=UNENCRYPTED://:9092,ENCRYPTED://:9093,REPLICATION://:9094
listener.security.protocol.map=UNENCRYPTED:PLAINTEXT,ENCRYPTED:SSL,REPLICATION:PLAINTEXT
ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456
(重新)启动 Kafka 代理
对于身份验证,您可以使用:
TLS 客户端身份验证基于加密连接的 X.509 证书
支持的 Kafka SASL(Simple Authentication and Security Layer)机制
基于 OAuth 2.0 令牌的身份验证
TLS 客户端身份验证只能在已使用 TLS 加密的连接中使用。要使用 TLS 客户端身份验证,可以为代理提供带有公钥的信任存储。这些密钥可用于验证连接到代理的客户端。信任存储应该以 Java Keystore (JKS)格式提供,并且应包含证书颁发机构的公钥。所有使用由信任存储中所含证书颁发机构签名的公钥和私钥的客户端都将被验证。truststore 的位置使用字段
ssl.truststore.location
进行设置。如果 truststore 是受密码保护的,则应在
ssl.truststore.password
属性中设置密码。例如:
ssl.truststore.location=/path/to/keystore/server-1.jks
ssl.truststore.password=123456
配置 truststore 后,必须使用
ssl.client.auth
属性启用 TLS 客户端身份验证。此属性可以设置为三个不同的值之一:
TLS 客户端身份验证关闭。(默认值)
requested
TLS 客户端身份验证是可选的。客户端需要使用 TLS 客户端证书进行身份验证,但可以选择.
required
需要客户端才能使用 TLS 客户端证书进行身份验证。
当客户端使用 TLS 客户端身份验证进行身份验证时,经过身份验证的主体名称是与经过身份验证的客户端证书区分名称。例如,具有可分辨名称
CN=someuser
的用户将使用以下主体
CN=someuser,OU=Unknown,O=Unknown,ST=Unknown,ST=Unknown,C=Unknown,C=Unknown
.如果不使用 TLS 客户端身份验证,并且 SASL 被禁用时,主体名称为
ANONYMOUS
。
SASL 身份验证使用 Java 身份验证和授权服务(JAAS)配置。JAAS 也用于身份验证 Kafka 和 ZooKeeper 连接。JAAS 使用自己的配置文件。此文件的建议位置为
/opt/kafka/config/jaas.conf
。该文件必须可由
kafka
用户读取。在运行 Kafka 时,使用 Java 系统属性
java.security.auth.login.config
来指定此文件的位置。在启动代理节点时,必须将此属性传递给 Kafka:
KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/my/jaas.config"; bin/kafka-server-start.sh
通过普通未加密的连接以及 TLS 连接支持 SASL 身份验证。对于每个侦听器,可以单独启用 SASL。要启用它,
listener.security.protocol.map
中的安全协议必须是
SASL_PLAINTEXT
或
SASL_SSL
。
Kafka 中的 SASL 身份验证支持多种不同的机制:
-
PLAIN
-
根据用户名和密码实施身份验证。用户名和密码存储在本地 Kafka 配置中。
-
SCRAM-SHA-256
和
SCRAM-SHA-512
-
使用 Salted Challenge Response Authentication Mechanism (SCRAM) 实现验证。SCRAM 凭证集中存储在 ZooKeeper 中。当 ZooKeeper 集群节点在私有网络中隔离时,可以使用 SCRAM。
-
GSSAPI
-
实现对 Kerberos 服务器的验证。
PLAIN
机制以未加密的格式通过网络发送用户名和密码。因此,它应该只与 TLS 加密结合使用。
SASL 机制通过 JAAS 配置文件进行配置。Kafka 使用名为
KafkaServer
的 JAAS 上下文。在 JAAS 中配置后,必须在 Kafka 配置中启用 SASL 机制。这可以通过
sasl.enabled.mechanisms
属性完成。这个属性包含已启用机制的逗号分隔列表:
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
如果使用了用于 inter-broker 进行通信的监听程序,则必须使用 属性
sasl.mechanism.inter.broker.protocol
以指定应使用的 SASL 机制。例如:
sasl.mechanism.inter.broker.protocol=PLAIN
将用于 openshift-broker 通信的用户名和密码,该通信必须使用字段
username
和
password
在
KafkaServer
JAAS 上下文中指定。
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
user_admin="123456"
user_user1="123456"
user_user2="123456";
与用户数据库的 JAAS 配置文件应保持在所有 Kafka 代理上同步。
当 SASL PLAIN 也用于内部代理身份验证时,username
和 password
属性应包含在 JAAS 上下文中:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_user1="123456"
user_user2="123456";
};
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required;
当在 Kafka 配置文件中启用 SASL 身份验证时,可以列出两个 SCRAM 机制。但是,只能为代理通信选择其中之一。例如:
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
SCRAM 机制的用户凭证存储在 ZooKeeper 中。kafka-configs.sh
工具可用于管理它们。例如,运行以下命令来添加用户 user1,密码为 123456:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
要删除用户凭证,请使用:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_server.keytab"
principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
Kerberos 主体中的域名必须始终为大写。
除了 JAAS 配置外,还需要在 Kafka 配置中的 sasl.kerberos.service.name
属性中指定 Kerberos 服务名称:
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
user_admin="123456"
user_user1="123456"
user_user2="123456";
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_server.keytab"
principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
org.apache.kafka.common.security.scram.ScramLoginModule required;
启用多个机制后,客户端就可以选择要使用的机制。
这个步骤描述了如何在 Kafka 代理中启用 TLS 客户端身份验证。
AMQ Streams
安装
在所有将用作 Kafka 代理的主机上。
启用
TLS 加密。
准备包含用于为用户证书签名的证书颁发机构的公钥的 JKS 信任存储。
编辑所有集群节点上的
/opt/kafka/config/server.properties
Kafka 配置文件:
使用用户证书的证书颁发机构,将
ssl.truststore.location
选项设置为 JKS 信任存储的路径。
将
ssl.truststore.password
选项设置为用于保护信任存储的密码。
将
ssl.client.auth
选项设置为
required
。
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=123456
ssl.client.auth=required
(重新)启动 Kafka 代理
5.4.8.6. 启用 SASL PLAIN 身份验证
这个步骤描述了如何在 Kafka 代理中启用 SASL PLAIN 身份验证。
AMQ Streams
安装
在所有将用作 Kafka 代理的主机上。
编辑或创建
/opt/kafka/config/jaas.conf
JAAS 配置文件。此文件应包含您的所有用户及其密码。确保所有 Kafka 代理上,确保这个文件都相同。
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
user_admin="123456"
user_user1="123456"
user_user2="123456";
编辑所有集群节点上的 /opt/kafka/config/server.properties
Kafka 配置文件:
更改 监听程序.security.protocol.map
字段,为您要使用 SASL PLAIN 身份验证的监听程序指定 SASL_PLAINTEXT
或 SASL_SSL
协议。
将 sasl.enabled.mechanisms
选项设置为 PLAIN
。
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
sasl.enabled.mechanisms=PLAIN
使用 KAFKA_OPTS 环境变量(重新)启动 Kafka 代理,将 JAAS 配置传递给 Kafka 代理。
su - kafka
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
5.4.8.7. 启用 SASL SCRAM 身份验证
这个步骤描述了如何在 Kafka 代理中启用 SASL SCRAM 身份验证。
AMQ Streams
安装
在所有将用作 Kafka 代理的主机上。
编辑或创建
/opt/kafka/config/jaas.conf
JAAS 配置文件。为
KafkaServer
上下文启用
ScramLoginModule
。确保所有 Kafka 代理上,确保这个文件都相同。
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required;
编辑所有集群节点上的 /opt/kafka/config/server.properties
Kafka 配置文件:
更改 监听程序.security.protocol.map
字段,为您要使用 SASL SCRAM 身份验证的监听程序指定 SASL_PLAINTEXT
或 SASL_SSL
协议。
将 sasl.enabled.mechanisms
选项设置为 SCRAM-SHA-256
或 SCRAM-SHA-512
。
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
sasl.enabled.mechanisms=SCRAM-SHA-512
使用 KAFKA_OPTS 环境变量(重新)启动 Kafka 代理,将 JAAS 配置传递给 Kafka 代理。
su - kafka
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
5.4.8.8. 添加 SASL SCRAM 用户
这个步骤描述了如何使用 SASL SCRAM 为身份验证添加新用户。
AMQ Streams
安装
在所有将用作 Kafka 代理的主机上。
启用
SASL SCRAM 身份验证。
使用
kafka-configs.sh
工具添加新的 SASL SCRAM 用户。
bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --add-config 'SCRAM-SHA-512=[password=<Password>]' --entity-type users --entity-name <Username>
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
5.4.8.9. 删除 SASL SCRAM 用户
这个步骤描述了如何在使用 SASL SCRAM 身份验证时删除用户。
AMQ Streams
安装
在所有将用作 Kafka 代理的主机上。
启用
SASL SCRAM 身份验证。
使用
kafka-configs.sh
工具删除 SASL SCRAM 用户。
/opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name <Username>
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
5.4.9. 使用基于 OAuth 2.0 令牌的身份验证
AMQ Streams 支持使用
OAUTHBEARER
和
PLAIN
机制使用
OAuth 2.0 身份验证
。
OAuth 2.0 启用应用程序之间的基于令牌的标准化身份验证和授权,使用中央授权服务器签发对资源有限访问权限的令牌。
Kafka 代理和客户端都需要配置为使用 OAuth 2.0。您可以配置 OAuth 2.0 身份验证,然后配置
OAuth 2.0 授权
。
无论使用的授权服务器是什么,OAuth 2.0 身份验证可与
基于 ACL 的 Kafka 授权
结合使用。
使用 OAuth 2.0 身份验证,应用程序客户端可以在不公开帐户凭据的情况下访问应用服务器(称为
资源服务器
)上的资源。
应用程序客户端通过访问令牌作为身份验证方法传递,应用服务器也可以用来决定要授予的访问权限级别。授权服务器处理访问权限的授予和询问有关访问权限的查询。
在 AMQ Streams 上下文中:
Kafka 代理充当 OAuth 2.0 资源服务器
Kafka 客户端充当 OAuth 2.0 应用程序客户端
Kafka 客户端在 Kafka 代理验证。代理和客户端根据需要与 OAuth 2.0 授权服务器通信,以获取或验证访问令牌。
对于 AMQ Streams 的部署,OAuth 2.0 集成提供:
服务器端 OAuth 2.0 支持 Kafka 代理
客户端 OAuth 2.0 支持 Kafka MirrorMaker、Kafka Connect 和 Kafka Bridge
RHEL 上的 AMQ Streams 包括两个 OAuth 2.0 库:
-
kafka-oauth-client
-
提供名为
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
的自定义登录回调处理器类。要处理
OAUTHBEARER
身份验证机制,请使用 Apache Kafka 提供的
OAuthBearerLoginModule
的登录回调处理程序。
-
kafka-oauth-common
-
一个帮助库,提供
kafka-oauth-client
库所需的一些功能。
提供的客户端库还依赖于一些额外的第三方库,例如:
keycloak-core
、
jackson-databind
和
slf4j-api
。
我们建议使用 Maven 项目打包您的客户端,以确保包含所有依赖项库。依赖项库可能会在以后的版本中有所变化。
OAuth 2.0 站点
AMQ Streams 支持 OAUTHBEARER 和 PLAIN 机制进行 OAuth 2.0 身份验证。这两种机制都允许 Kafka 客户端使用 Kafka 代理建立经过身份验证的会话。客户端、授权服务器和 Kafka 代理之间的身份验证流因每种机制而异。
我们建议您将客户端配置为尽可能使用 OAUTHBEARER。OAUTHBEARER 提供比 PLAIN 更高的安全性,因为客户端凭证
不会
与 Kafka 代理共享。考虑仅在不支持 OAUTHBEARER 的 Kafka 客户端中使用 PLAIN。
您可以将 Kafka 代理监听程序配置为使用 OAuth 2.0 身份验证来连接客户端。如果需要,您可以使用同一
oauth
侦听器上的 OAUTHBEARER 和 PLAIN 机制。在
oauth
侦听器配置中明确指定用于支持每种机制的属性。
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
许多 Kafka 客户端工具都使用在协议级别为 OAUTHBEARER 提供基本支持的库。为了支持应用程序开发,AMQ Streams 为上游 Kafka Client Java 库(但不适用于其他库)提供了一个
OAuth 回调处理器
。因此,您不需要编写自己的回调处理程序。应用客户端可以使用回调处理程序来提供访问令牌。使用 Go 等其他语言编写的客户端必须使用自定义代码连接到授权服务器并获取访问令牌。
使用 OAUTHBEARER 时,客户端发起与 Kafka 代理用于凭证交换的会话,其中凭证采用由回调处理程序提供的 bearer 令牌的形式。使用回调时,您可以使用以下三种方法之一配置令牌置备:
客户端 ID 和 Secret (通过使用
OAuth 2.0 客户端证书
机制)
在配置时手动获取的长期访问令牌
长期刷新令牌,在配置时手动获取
OAUTHBEARER 身份验证只能由 Kafka 客户端用来支持协议级别的 OAUTHBEARER 机制。
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
PLAIN 是所有 Kafka 客户端工具使用的简单身份验证机制。要启用 PLAIN 以用于 OAuth 2.0 身份验证,AMQ Streams 提供了
OAuth 2.0 over PLAIN
服务器端的回调。
通过 PLAIN 的 AMQ Streams 实现,客户端凭证不会存储在 ZooKeeper 中。相反,客户端凭证会在兼容授权服务器后进行集中处理,这与使用 OAUTHBEARER 身份验证类似。
当通过 PLAIN 回调与 OAuth 2.0 一起使用时,Kafka 客户端使用以下任一方法与 Kafka 代理进行身份验证:
客户端 ID 和 secret (使用
OAuth 2.0 客户端证书
机制)
在配置时手动获取的长期访问令牌
对于这两种方法,客户端必须提供 PLAIN
username
和
password
属性,将凭证传递给 Kafka 代理。客户端使用这些属性来传递客户端 ID 和 secret 或用户名和访问令牌。
客户端 ID 和 secret 用于获取访问令牌。
访问令牌作为
密码
属性值传递。您可以使用 或不使用
$accessToken:
前缀传递访问令牌。
如果您在监听器配置令牌端点(
oauth.token.endpoint.uri
),则需要前缀。
如果您没有在监听器配置中配置令牌端点(
oauth.token.endpoint.uri
),则不需要前缀。Kafka 代理将密码解析为原始访问令牌。
如果
密码
被设置为访问令牌,则
用户名
必须设置为 Kafka 代理从访问令牌获取的相同主体名称。您可以使用
oauth.username.claim
,
oauth.fallback.username.claim
,
oauth.fallback.username.prefix
, 和
oauth.userinfo.endpoint.uri
属性在监听器中指定用户名提取选项。用户名提取过程还取决于您的授权服务器;特别是,它将客户端 ID 映射到帐户名称。
5.4.9.1.1. 使用属性或变量配置 OAuth 2.0
您可以使用 Java Authentication and Authorization Service(JAAS)属性或环境变量来配置 OAuth 2.0 设置。
JAAS 属性在
server.properties
配置文件中配置,并传递为
listener.name.
LISTENER-NAME
.oauthbearer.sasl.jaas.config
属性的键值对。
如果使用环境变量,您仍然需要在
server.properties
文件中提供
listener.name.
LISTENER-NAME
.oauthbearer.sasl.jaas.config
属性,但您可以忽略其他 JAAS 属性。
您可以使用大写或大写的环境变量命名惯例。
AMQ Streams OAuth 2.0 库使用以以下内容开头的属性:
OAuth.
用于配置身份验证
strimzi.
到
configure OAuth 2.0 authorization
OAuth 2.0 Kafka 代理配置
5.4.9.2. OAuth 2.0 Kafka 代理配置
OAuth 2.0 身份验证的 Kafka 代理配置涉及:
在授权服务器中创建 OAuth 2.0 客户端
在 Kafka 集群中配置 OAuth 2.0 身份验证
与授权服务器的关系,Kafka 代理和 Kafka 客户端都被视为 OAuth 2.0 客户端。
5.4.9.2.1. 授权服务器上的 OAuth 2.0 客户端配置
要配置 Kafka 代理以验证会话启动期间收到的令牌,建议的做法是在授权服务器中创建一个 OAuth 2.0
client
定义(配置为
confidential
),并启用了以下客户端凭证:
kafka-broker
的客户端 ID (例如)
客户端 ID 和 secret 作为身份验证机制
只有在使用授权服务器的非公共内省端点时,才需要使用客户端 ID 和 secret。当使用公共授权服务器端点时,通常会要求凭据,因为快速本地 JWT 令牌验证一样。
5.4.9.2.2. Kafka 集群中的 OAuth 2.0 身份验证配置
要在 Kafka 集群中使用 OAuth 2.0 身份验证,您可以在 Kafka
server.properties
文件中为 Kafka 集群启用 OAuth 身份验证监听程序配置。至少需要配置。您还可以配置 TLS 侦听器,其中 TLS 用于代理通信。
您可以使用以下方法之一为授权服务器配置令牌验证代理:
快速本地令牌验证:JW
KS
端点和签名的 JWT 格式的访问令牌
内省
端点
您可以配置 OAUTHBEARER 或 PLAIN 身份验证,或两者。
以下示例显示了应用
全局
监听器配置的最低配置,这意味着,内部代理间通信通过与应用程序客户端相同的监听程序进行。
这个示例还显示了一个特定监听程序的 OAuth 2.0 配置,在其中您可以指定
listener.name.
LISTENER-NAME
.sasl.enabled.mechanisms
而不是
sasl.enabled.mechanisms
。
LISTENER-NAME
是监听器的不区分大小写的名称。在这里,我们将侦听器的
CLIENT
命名为
listener.name.client.sasl.enabled.mechanisms
。
这个示例使用 OAUTHBEARER 身份验证。
5.4.9.2.3. 快速本地 JWT 令牌验证配置
快速本地 JWT 令牌验证会在本地检查 JWT 令牌签名。
本地检查可确保令牌:
符合 type,具体为访问令牌包含
Bearer
的(
typ
)声明值
为有效(不是过期)
具有与
有效IssuerURI
匹配的签发者
在配置侦听器时,您可以指定
有效的签发者 URI
,以便拒绝任何未由授权服务器发布的令牌。
授权服务器不需要在快速本地 JWT 令牌验证过程中联系。您可以通过指定由 OAuth 2.0 授权服务器公开的
JWKs 端点 URI
来激活快速本地 JWT 令牌验证。端点包含验证已签名的 JWT 令牌的公钥,这些令牌由 Kafka 客户端作为凭证发送。
与授权服务器的所有通信都应使用 HTTPS 执行。
对于 TLS 侦听器,您可以配置证书
信任存储
并指向信任存储文件。
5.4.9.2.4. OAuth 2.0 内省端点配置
使用 OAuth 2.0 内省端点验证的令牌验证会将收到的访问令牌视为不透明的。Kafka 代理将访问令牌发送到内省端点,该端点使用验证所需的令牌信息进行响应。重要的是,如果特定的访问令牌有效,它会返回最新的信息,以及令牌过期时的相关信息。
要配置基于 OAuth 2.0 内省的验证,您可以指定一个
内省端点 URI
,而不是为快速本地 JWT 令牌验证指定的 JWKs 端点 URI。根据授权服务器,通常必须指定
client ID
和
client secret
,因为内省端点通常受到保护。
您可以将 OAuth 监听程序配置为使用 Kafka 客户端和 Kafka 代理之间的 OAuth 2.0 会话的 Kafka
会话重新身份验证
。这个机制在指定的时间后会强制在客户端和代理之间执行经过身份验证的会话。当会话过期时,客户端会立即通过重复使用现有连接来启动一个新会话,而不是将其丢弃。
默认情况下,会话重新身份验证将被禁用。您可以在
server.properties
文件中启用它。为启用了 OAUTHBEARER 或 PLAIN 的 TLS 侦听器设置 connection
.max.reauth.ms
属性,作为 SASL 机制。
您可以指定每个监听程序的会话重新身份验证。例如:
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
会话重新身份验证必须由客户端使用的 Kafka 客户端库支持。
会话重新身份验证可用于快速
本地 JWT
或
内省端点
令牌验证。
5.4.9.4. OAuth 2.0 Kafka 客户端配置
Kafka 客户端配置有:
从授权服务器获取有效访问令牌(客户端 ID 和 Secret)所需的凭证
使用授权服务器提供的工具获取有效的长期访问令牌或刷新令牌
发送给 Kafka 代理的唯一信息是访问令牌。用于与授权服务器进行身份验证的凭证从不会发送到代理。
当客户端获取访问令牌时,不需要进一步与授权服务器通信。
最简单的机制是使用客户端 ID 和 Secret 进行身份验证。使用长期的访问令牌或长期的刷新令牌会增加复杂性,因为对授权服务器工具还有额外的依赖。
如果使用长期的访问令牌,可能需要在授权服务器中配置客户端,以增加令牌的最长生命周期。
如果 Kafka 客户端没有直接配置访问令牌,客户端会在 Kafka 会话发起授权服务器的过程中交换访问令牌的凭证。Kafka 客户端交换:
客户端 ID 和机密
客户端 ID、刷新令牌和(可选)Secret
5.4.9.5. OAuth 2.0 客户端验证流程
OAuth 2.0 身份验证流取决于底层 Kafka 客户端和 Kafka 代理配置。该流必须由使用的授权服务器支持。
Kafka 代理监听程序配置决定客户端如何使用访问令牌进行身份验证。客户端可以传递客户端 ID 和机密,以请求访问令牌。
如果侦听器配置为使用 PLAIN 身份验证,客户端可以通过客户端 ID 和 secret 或用户名和访问令牌进行身份验证。这些值作为 PLAIN 机制
username
和
password
属性传递。
侦听器配置支持以下令牌验证选项:
您可以基于 JWT 签名检查和本地令牌内省使用快速本地令牌验证,而无需联系授权服务器。授权服务器提供带有公共证书的 JWKS 端点,用于验证令牌中的签名。
您可以使用授权服务器提供的令牌内省端点调用。每次建立新的 Kafka 代理连接时,代理都会将从客户端接收的访问令牌传递给授权服务器。Kafka 代理检查响应,以确认令牌是否有效。
授权服务器可能只允许使用不透明的访问令牌,这意味着无法进行本地令牌验证。
还可以为以下类型的身份验证配置 Kafka 客户端凭证:
使用之前生成的长期访问令牌直接进行本地访问
与授权服务器联系,以获取要发出的新访问令牌(使用客户端 ID 和 secret 或刷新令牌)
5.4.9.6. 配置 OAuth 2.0 身份验证
OAuth 2.0 用于在 Kafka 客户端和 AMQ Streams 组件间交互。
要将 OAuth 2.0 用于 AMQ Streams,您必须:
为 AMQ Streams 集群和 Kafka 客户端配置 OAuth 2.0 授权服务器
使用配置为使用 OAuth 2.0 的 Kafka 代理监听程序部署或更新 Kafka 集群
更新基于 Java 的 Kafka 客户端以使用 OAuth 2.0
5.4.9.6.1. 将 Red Hat Single Sign-On 配置为 OAuth 2.0 授权服务器
此流程描述了如何将 Red Hat Single Sign-On 部署为授权服务器并进行配置以与 AMQ Streams 集成。
授权服务器为身份验证和授权提供了一个中央点,以及用户、客户端和权限的管理。Red Hat Single Sign-On 的概念为
realm
,其中一个 realm 代表一组单独的用户、客户端、权限和其他配置。您可以使用默认
主域
,或创建新域。每个 realm 会公开自己的 OAuth 2.0 端点,这意味着应用程序客户端和应用服务器都需要使用相同的域。
要将 OAuth 2.0 与 AMQ Streams 搭配使用,请使用部署 Red Hat Single Sign-On 来创建和管理身份验证域。
如果您已经部署了 Red Hat Single Sign-On,您可以跳过部署步骤并使用您的当前部署。
您将需要熟悉使用红帽单点登录。
有关安装和管理说明,请参阅:
服务器安装和配置指南
服务器管理指南
AMQ Streams 和 Kafka 正在运行
对于 Red Hat Single Sign-On 部署:
检查
Red Hat Single Sign-On 支持的配置
Install Red Hat Single Sign-On.
您可以从 ZIP 文件或使用 RPM 安装。
登录到 Red Hat Single Sign-On Admin 控制台,为 AMQ Streams 创建 OAuth 2.0 策略。
当您部署 Red Hat Single Sign-On 时会提供登录信息。
创建并启用一个域。
您可以使用现有的 master 域。
如果需要,调整域的会话和令牌超时。
创建名为
kafka-broker
的客户端。
在
Settings
选项卡中设置:
访问 Type
to
Confidential
Standard Flow Enabled
为
OFF
为这个客户端禁用 Web 登录
启用服务帐户
到
ON
,以允许此客户端在其自己的名称中进行身份验证
在继续操作前,点
Save
。
在
选项卡中,记录使用 AMQ Streams Kafka 集群配置的 secret。
对将连接到 Kafka 代理的任何应用程序客户端重复客户端创建步骤。
为每个新客户端创建一个定义。
您将在配置中使用名称作为客户端 ID。
5.4.9.6.2. 为 Kafka 代理配置 OAuth 2.0 支持
此流程描述了如何配置 Kafka 代理,以便代理监听程序可以使用授权服务器使用 OAuth 2.0 身份验证。
我们建议通过配置 TLS 监听器在加密接口中使用 OAuth 2.0。不建议使用普通的监听程序。
使用支持您选择的授权服务器的属性配置 Kafka 代理,以及您要实现的授权类型。
有关 Kafka 代理监听程序的配置和身份验证的更多信息,请参阅:
OAuth 2.0 验证机制
有关监听器配置中使用的属性的描述,请参阅:
OAuth 2.0 Kafka 代理配置
AMQ Streams 和 Kafka 正在运行
已部署了 OAuth 2.0 授权服务器
在
server.properties
文件中配置 Kafka 代理监听程序配置。
例如,使用 OAUTHBEARER 机制:
sasl.enabled.mechanisms=OAUTHBEARER
listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
将代理连接设置配置为
listener.name.client.oauthbearer.sasl.jaas.config
的一部分。
此处的示例显示了连接配置选项。
如果证书主机名与访问 URL 主机名不匹配,您可以关闭证书主机名验证:
oauth.ssl.endpoint.identification.algorithm=""
检查可确保客户端与授权服务器的连接是 authentic。您可能希望在非生产环境中关闭验证。
根据您选择的身份验证流程配置额外的属性。
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
# ...
oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token" \ 1
oauth.custom.claim.check="@.custom == 'custom-value'" \ 2
oauth.scope="SCOPE" \ 3
oauth.check.audience="true" \ 4
oauth.audience="AUDIENCE" \ 5
oauth.valid.issuer.uri="https://https://AUTH-SERVER-ADDRESS/auth/REALM-NAME" \ 6
oauth.client.id="kafka-broker" \ 7
oauth.client.secret="kafka-broker-secret" \ 8
oauth.refresh.token="REFRESH-TOKEN-FOR-KAFKA-BROKERS" \ 9
oauth.access.token="ACCESS-TOKEN-FOR-KAFKA-BROKERS" ; 10
oauth.connect.timeout.seconds=60 11
oauth.read.timeout.seconds=60 12
oauth.groups.claim="$.groups" 13
oauth.groups.claim.delimiter="," 14
-
1
-
授权服务器的 OAuth 2.0 令牌端点 URL。对于生产环境,始终使用
https://
urls。当使用
KeycloakRBACAuthorizer
或启用了 OAuth 2.0 的监听程序时,需要用于 Inter-broker 间通信。
(可选
)自定义声明检查
。JsonPath 过滤器查询,在验证期间将额外的自定义规则应用到 JWT 访问令牌。如果访问令牌不包含必要的数据,则会被拒绝。使用
introspection
端点方法时,自定义检查将应用到内省端点响应 JSON。
(可选)传递给令牌端点
的范围
参数。获取访问令牌进行代理身份验证时使用的
范围
。它还在使用
clientId
和
secret
的 PLAIN 客户端验证中用于 OAuth 2.0 的客户端名称。这只会影响获取令牌的能力,以及令牌的内容,具体取决于授权服务器。它不会受到监听程序影响令牌验证规则。
(可选)
检查
。如果您的授权服务器
提供了一个ud (udi
ence)申索,并且希望强制执行听众检查,则将
ouath.check.audience
设置为
true
。受众检查确定令牌的预期接收者。因此,Kafka 代理将拒绝在其
aud
声明中没有
clientId
的令牌。默认为
false
。
(可选)
传递给
令牌端点的使用者参数。在获取用于代理身份验证的访问令牌时,需要使用
使用者
。它还在使用
clientId
和
secret
的 PLAIN 客户端验证中用于 OAuth 2.0 的客户端名称。这只会影响获取令牌的能力,以及令牌的内容,具体取决于授权服务器。它不会受到监听程序影响令牌验证规则。
有效的签发者 URI。只有此签发者发布的访问令牌才会被接受。(始终必需。)
Kafka 代理配置的客户端 ID,在所有代理中都相同。这是
使用授权服务器注册的客户端,作为
kafka-broker
。当内省端点用于令牌验证时,或者使用
KeycloakRBACAuthorizer
时是必需的。
Kafka 代理配置的 secret,适用于所有代理。当代理必须向授权服务器进行身份验证时,需要指定客户端 secret、访问令牌或刷新令牌。
(可选) Kafka 代理的长期刷新令牌。
(可选) Kafka 代理的长期访问令牌。
(可选)连接到授权服务器时连接超时(以秒为单位)。默认值为 60。
(可选)连接到授权服务器时读取超时(以秒为单位)。默认值为 60。
JsonPath 查询,用于从 JWT 令牌或内省端点响应中提取组信息。默认情况下不设置。这可以由自定义授权者用来根据用户组做出授权决策。
以单个分隔字符串返回时用于解析组信息的分隔符。默认值为 ','(comma)。
根据您如何应用 OAuth 2.0 身份验证以及所使用的授权服务器类型,添加额外的配置设置:
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
# ...
oauth.check.issuer=false \ 1
oauth.fallback.username.claim="CLIENT-ID" \ 2
oauth.fallback.username.prefix="CLIENT-ACCOUNT" \ 3
oauth.valid.token.type="bearer" \ 4
oauth.userinfo.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/userinfo" ; 5
-
1
-
如果您的授权服务器不提供
声明
,则无法执行签发者检查。在这种情况下,将
oauth.check.issuer
设置为
false
,且不指定
oauth.valid.issuer.uri
。默认为
true
。
授权服务器可能无法提供单一属性来识别常规用户和客户端。当客户端在其自己的名称中进行身份验证时,该服务器可能会提供
客户端 ID
。当用户使用用户名和密码进行身份验证时,若要获取刷新令牌或访问令牌,服务器在客户端 ID 之外可能会提供
用户名
属性。如果主用户 ID 属性不可用,则使用此回退选项指定要使用的用户名声明(attribute)。
在适用
oauth.fallback.username.claim
时,可能需要防止用户名声明值和回退用户名声明之间的名称冲突。例如存在名为
producer
的客户端,但存在一个名为
producer
的普通用户。为了区分这两者,您可以使用此属性为客户端的用户 ID 添加前缀。
(仅在使用
oauth.introspection.endpoint.uri
时,取决于您使用的授权服务器,内省端点可能会或不返回
令牌类型
属性,或者可能包含不同的值。您可以指定来自内省端点要包含的响应的有效令牌类型值。
(仅在使用
oauth.introspection.endpoint.uri
)时,授权服务器可以按此类方式进行配置或实施,而不在内省端点响应中提供任何可识别的信息。要获取用户 ID,您可以将
userinfo
端点的 URI 配置为回退。
oauth.fallback.username.claim
、
oauth.fallback.username.claim
和
oauth.fallback.username.prefix
设置应用到
userinfo
端点的响应。
5.4.9.6.3. 将 Kafka Java 客户端配置为使用 OAuth 2.0
这个步骤描述了如何配置 Kafka producer 和消费者 API,以使用 OAuth 2.0 与 Kafka 代理交互。
将客户端回调插件添加到您的
pom.xml
文件中,并配置系统属性。
AMQ Streams 和 Kafka 正在运行
OAuth 2.0 授权服务器被部署并为 OAuth 访问 Kafka 代理配置
为 OAuth 2.0 配置 Kafka 代理
将支持 OAuth 2.0 的客户端程序库添加到 Kafka 客户端的
pom.xml
文件中:
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>kafka-oauth-client</artifactId>
<version>0.10.0.redhat-00014</version>
</dependency>
为回调配置系统属性:
System.setProperty(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, “https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token”); 1
System.setProperty(ClientConfig.OAUTH_CLIENT_ID, "<client_name>"); 2
System.setProperty(ClientConfig.OAUTH_CLIENT_SECRET, "<client_secret>"); 3
System.setProperty(ClientConfig.OAUTH_SCOPE, "<scope_value>") 4
System.setProperty(ClientConfig.OAUTH_AUDIENCE, "<audience_value") 5
-
1
-
授权服务器令牌端点的 URI。
客户端 ID,这是在授权服务器中创建
客户端
时使用的名称。
在授权服务器中创建
客户端
secret。
(可选)从令牌端点请求令牌的
范围
。授权服务器可能需要客户端指定范围。
(可选)从令牌端点请求令牌的
听众
。授权服务器可能需要客户端指定受众。
在 Kafka 客户端配置中,在 TLS 加密连接中启用
OAUTHBEARER
或
PLAIN
机制。
5.4.10. 使用基于 OAuth 2.0 令牌的授权
如果您将 OAuth 2.0 与 Red Hat Single Sign-On 用于基于令牌的身份验证搭配使用,您还可以使用 Red Hat Single Sign-On 配置授权规则来限制客户端对 Kafka 代理的访问。身份验证建立了用户的身份。授权决定该用户的访问权限级别。
AMQ Streams 支持通过 Red Hat Single Sign-On
Authorization Services
使用基于 OAuth 2.0 令牌的授权,它允许您集中管理安全策略和权限。
Red Hat Single Sign-On 中定义的安全策略和权限用于授予对 Kafka 代理上资源的访问权限。用户和客户端与允许对 Kafka 代理执行特定操作的策略进行匹配。
Kafka 允许所有用户默认对代理进行完全访问,同时还提供
AclAuthorizer
插件来配置基于 Access Control Lists (ACL)的授权。
zookeeper 存储了基于
用户名
授予或拒绝对资源的访问的 ACL 规则。但是,红帽单点登录基于 OAuth 2.0 令牌的授权在您希望实现对 Kafka 代理的访问控制方面具有更大的灵活性。另外,您可以将 Kafka 代理配置为使用 OAuth 2.0 授权和 ACL。
使用基于 OAuth 2.0 令牌的身份验证
Kafka 授权
Red Hat Single Sign-On 文档
AMQ Streams 中的 OAuth 2.0 授权使用红帽单点登录服务器授权服务 REST 端点,通过在特定用户上应用定义的安全策略来扩展基于令牌的身份验证,并为该用户提供授予不同资源的权限列表。策略使用角色和组将权限与用户匹配。OAuth 2.0 授权根据从 Red Hat Single Sign-On Authorization Services 用户获得的授予者列表在本地强制实施权限。
5.4.10.1.1. Kafka 代理自定义授权器
AMQ Streams 提供了 Red Hat Single Sign-On
authorizer
(
KeycloakRBACAuthorizer
)。要使用由 Red Hat Single Sign-On 提供的授权服务的 Red Hat Single Sign-On REST 端点,您可以在 Kafka 代理上配置自定义授权器。
授权程序根据需要从授权服务器获取授予权限的列表,并在 Kafka Broker 上本地强制实施授权,为每个客户端请求做出快速授权决策。
5.4.10.2. 配置 OAuth 2.0 授权支持
这个步骤描述了如何使用 Red Hat Single Sign-On Authorization Services 将 Kafka 代理配置为使用 OAuth 2.0 授权服务。
考虑某些用户所需的访问权限或希望限制某些用户。您可以结合使用 Red Hat Single Sign-On
组
、
角色
、
客户端
和用户
,在 Red Hat Single Sign-On 中配置访问权限。
通常,组用于根据机构部门或地理位置匹配用户。和角色用于根据用户的功能匹配。
使用红帽单点登录,您可以在 LDAP 中存储用户和组,而客户端和角色不能以这种方式存储。存储和访问用户数据可能是您选择配置授权策略的原因。
无论 Kafka 代理上实施的授权是什么,
超级用户
始终对 Kafka 代理没有限制的访问。
AMQ Streams 必须通过 Red Hat Single Sign-On 配置为使用 OAuth 2.0 进行
基于令牌的身份验证
。设置授权时,您使用相同的红帽单点登录服务器端点。
您需要了解如何管理 Red Hat Single Sign-On Authorization Services 的策略和权限,如
Red Hat Single Sign-On 文档
中所述。
访问 Red Hat Single Sign-On Admin Console,或使用 Red Hat Single Sign-On Admin CLI 为设置 OAuth 2.0 身份验证时创建的 Kafka 代理客户端启用授权服务。
使用授权服务定义客户端的资源、授权范围、策略和权限。
通过分配角色和组,将权限绑定到用户和客户端。
将 Kafka 代理配置为使用 Red Hat Single Sign-On 授权。
在 Kafka
server.properties
配置文件中添加以下内容,以便在 Kafka 中安装授权器:
authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer
principal.builder.class=io.strimzi.kafka.oauth.server.authorizer.JwtKafkaPrincipalBuilder
添加 Kafka 代理的配置以访问授权服务器和授权服务。
在此我们显示作为额外属性添加到
server.properties
的示例配置,但您也可以使用大写或大写的命名规则将它们定义为环境变量。
strimzi.authorization.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token" 1
strimzi.authorization.client.id="kafka" 2
-
1
-
Red Hat Single Sign-On 的 OAuth 2.0 令牌端点 URL。对于生产环境,始终使用
https://
urls。
在启用了授权服务的 Red Hat Single Sign-On 中的 OAuth 2.0 客户端定义的客户端 ID。通常,
kafka
用作 ID。
(可选)为特定 Kafka 集群添加配置。
strimzi.authorization.kafka.cluster.name="kafka-cluster" 1
-
1
-
特定 Kafka 集群的名称。名称用于目标权限,从而在同一 Red Hat Single Sign-On realm 中管理多个集群。默认值为
kafka-cluster
。
(可选)分隔到简单授权。
strimzi.authorization.delegate.to.kafka.acl="false" 1
-
1
-
如果 Red Hat Single Sign-On Authorization Services 策略无法访问,将授权委派给 Kafka
AclAuthorizer
。默认值为
false
。
(可选)为授权服务器添加 TLS 连接的配置。
strimzi.authorization.ssl.truststore.location=<path-to-truststore> 1
strimzi.authorization.ssl.truststore.password=<my-truststore-password> 2
strimzi.authorization.ssl.truststore.type=JKS 3
strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG 4
strimzi.authorization.ssl.endpoint.identification.algorithm=HTTPS 5
-
1
-
包含证书的 truststore 的路径。
truststore 的密码。
truststore 类型。如果没有设置,则使用默认的 Java 密钥存储类型。
随机数生成器实施。如果没有设置,则使用 Java 平台 SDK 默认。
主机名验证。如果设置为空字符串,则会关闭主机名验证。如果没有设置,则默认值为
HTTPS
,它为服务器证书强制实施主机名验证。
(可选)配置授权服务器的刷新。授予刷新任务通过枚举活跃令牌并为每个请求最新的授权来工作。
strimzi.authorization.grants.refresh.period.seconds="120" 1
strimzi.authorization.grants.refresh.pool.size="10" 2
-
1
-
指定授权服务器刷新列表的频率(默认为每分钟运行一次)。要打开刷新以调试目的,请将 设置为
"0"
。
指定授予刷新作业使用的线程池大小(并行级别)。默认值为
"5"
。
通过以客户端或具有特定角色的用户访问 Kafka 代理来验证配置的权限,确保它们具有必要的访问权限,或者没有应该具有访问权限。
开源策略代理(OPA)是一个开源策略引擎。您可以将 OPA 与 AMQ Streams 集成,作为基于策略的授权机制,允许在 Kafka 代理上进行客户端操作。
从客户端发出请求时,OPA 将根据为 Kafka 访问定义的策略评估请求,然后允许或拒绝请求。
红帽不支持 OPA 服务器。
打开 Policy Agent 网站
在将 OPA 与 AMQ Streams 集成前,请考虑如何定义策略以提供精细的访问控制。
您可以为 Kafka 集群、消费者组和主题定义访问控制。例如,您可以定义一个授权策略,允许从制作者客户端写入到特定代理主题的访问。
因此,策略可能会指定:
与制作者客户端关联的用户
主体和
主机地址
允许客户端
的操作
策略适用的
资源类型
(
topic
)和
资源名称
允许或拒绝决策将写入到策略中,并根据提供的请求和客户端识别数据提供响应。
在我们的示例中,生产者客户端必须满足策略才被允许写入该主题。
要启用 Kafka 访问 OPA 策略引擎以查询访问控制策略,您可以在您的 Kafka
server.properties
文件中配置自定义 OPA authorizer 插件 (
kafka-authorizer-opa-
VERSION
.jar
)。
客户端发出请求时,OPA 策略引擎将通过指定的 URL 地址和 REST 端点来查询 OPA 策略引擎,该端点必须是定义的策略的名称。
该插件提供了客户端请求的详细信息 - 用户主体、操作和资源 - 以 JSON 格式针对策略进行检查。详细信息将包括客户端的唯一身份;例如,使用 TLS 身份验证时将区分名称与客户端证书进行区分。
OPA 使用数据向插件提供响应 -
true
或
false
- 允许或拒绝请求。
这个步骤描述了如何配置 Kafka 代理以使用 OPA 授权。
考虑某些用户所需的访问权限或希望限制某些用户。您可以使用
用户和
Kafka
资源
组合来定义 OPA 策略。
可以设置 OPA 从 LDAP 数据源加载用户信息。
无论 Kafka 代理上实施的授权是什么,
超级用户
始终对 Kafka 代理没有限制的访问。
必须有一个 OPA 服务器可用于连接。
Kafka 的 OPA authorizer 插件
编写授权客户端请求以执行 Kafka 代理操作所需的 OPA 策略。
请参阅
定义 OPA 策略
。
现在,将 Kafka 代理配置为使用 OPA。
为 Kafka 安装 OPA 授权程序插件
。
请参阅连接到 OPA
。
确保插件文件包含在 Kafka classpath 中。
在 Kafka
server.properties
配置文件中添加以下内容以启用 OPA 插件:
authorizer.class.name: com.bisnode.kafka.authorization.OpaAuthorizer
为 Kafka 代理的
server.properties
添加进一步配置,以访问 OPA 策略引擎和策略。
opa.authorizer.url=https://OPA-ADDRESS/allow 1
opa.authorizer.allow.on.error=false 2
opa.authorizer.cache.initial.capacity=50000 3
opa.authorizer.cache.maximum.size=50000 4
opa.authorizer.cache.expire.after.seconds=600000 5
super.users=User:alice;User:bob 6
-
1
-
(必需)策略将查询授权器插件的 OAuth 2.0 令牌端点 URL。在本例中,策略名为
allow
。
标志指定在授权器插件无法与 OPA 策略引擎连接时,默认是否允许或拒绝客户端访问。
本地缓存字节数的初始容量。使用缓存,以便插件不必为每个请求查询 OPA 策略引擎。
本地缓存字节数的最大容量。
通过从 OPA 策略引擎重新加载本地缓存来刷新时间(毫秒)。
被视为超级用户的用户主体列表,以便在不查询 Open Policy Agent 策略的情况下始终允许它们。
有关身份验证和授权选项的信息,请参阅
Open Policy Agent 网站
。
使用具有且没有正确授权的客户端访问 Kafka 代理来验证配置的权限。
Kafka 代理使用 Log4j 作为其日志记录基础架构。默认情况下,日志配置从
log4j.properties
配置文件中读取,该文件应放在
/opt/kafka/config/
目录中或 classpath 中。可以使用 Java 属性
log4j.configuration
来更改配置文件的位置和名称,该配置可使用
KAFKA_LOG4J_OPTS
环境变量传递给 Kafka:
su - kafka
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/my/path/to/log4j.config"; /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
有关 Log4j 配置的更多信息,请参阅
Log4j manual
。
5.4.12.1. 为 Kafka 代理日志记录器动态更改日志级别
Kafka 代理日志记录由每个代理中的多个
broker loggers
提供。您可以动态更改代理日志记录器的日志记录级别,而无需重启代理。增加日志返回的详细级别 - 从
INFO
变为
DEBUG
,例如:在 Kafka 集群中调查性能问题时很有用。
代理日志记录器也可以动态地重置为其默认日志记录级别。
AMQ Streams 安装在主机上
zookeeper 和 Kafka 正在运行
切换到
kafka
用户:
su - kafka
使用
kafka-configs.sh
工具列出代理的所有代理日志记录器:
/opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_address> --describe --entity-type broker-loggers --entity-name BROKER-ID
例如,对于代理
0
:
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type broker-loggers --entity-name 0
这会返回每个日志记录器的日志记录级别:
TRACE
,
DEBUG
,
INFO
,
WARN
,
ERROR
, 或
FATAL
。例如:
kafka.controller.ControllerChannelManager=INFO sensitive=false synonyms={}
kafka.log.TimeIndex=INFO sensitive=false synonyms={}
更改一个或多个代理日志记录器的日志级别。使用
--alter
和
--add-config
选项,并将每个日志记录器及其级别指定为双引号中的逗号分隔列表。
/opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --add-config "LOGGER-ONE=NEW-LEVEL,LOGGER-TWO=NEW-LEVEL" --entity-type broker-loggers --entity-name BROKER-ID
例如,对于代理
0
:
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config "kafka.controller.ControllerChannelManager=WARN,kafka.log.TimeIndex=WARN" --entity-type broker-loggers --entity-name 0
如果成功,则返回: