八块腹肌的风衣 · Error starting ZK ...· 昨天 · |
玉树临风的苦瓜 · Bigwhite's Blog· 2 天前 · |
威武的帽子 · Kafka 中文文档 - ApacheCN· 5 天前 · |
玩足球的手套 · java.lang.NoClassDefFo ...· 6 天前 · |
乖乖的蜡烛 · View Source· 6 天前 · |
玩足球的拐杖 · 关于2023年西南石油大学硕士研究生各招生专 ...· 4 月前 · |
酷酷的足球 · 《森林工程》森林工程杂志社投稿_期刊论文发表 ...· 5 月前 · |
自信的抽屉 · 天玑8100和骁龙8+ Gen ...· 8 月前 · |
威武的蜡烛 · 上世纪香港老照片 ...· 1 年前 · |
犯傻的黄豆 · 必存東京鐵塔拍照懶人包!經典不敗地標,白天到 ...· 1 年前 · |
acl kafka |
https://access.redhat.com/documentation/zh-cn/red_hat_amq/2020.q4/html/using_amq_streams_on_rhel/configuring_kafka |
睿智的爆米花
6 月前 |
/opt/kafka/config/server.properties
. The configuration file has to be readable by the
kafka
user.
AMQ Streams ships an example configuration file that highlights various basic and advanced features of the product. It can be found under
config/server.properties
in the AMQ Streams installation directory.
This chapter explains the most important configuration options. For a complete list of supported Kafka broker configuration options, see
Appendix A,
Broker configuration parameters
.
Kafka brokers need ZooKeeper to store some parts of their configuration as well as to coordinate the cluster (for example to decide which node is a leader for which partition). Connection details for the ZooKeeper cluster are stored in the configuration file. The field
zookeeper.connect
contains a comma-separated list of hostnames and ports of members of the zookeeper cluster.
For example:
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181
Kafka will use these addresses to connect to the ZooKeeper cluster. With this configuration, all Kafka
znodes
will be created directly in the root of ZooKeeper database. Therefore, such a ZooKeeper cluster could be used only for a single Kafka cluster. To configure multiple Kafka clusters to use single ZooKeeper cluster, specify a base (prefix) path at the end of the ZooKeeper connection string in the Kafka configuration file:
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181/my-cluster-1
Kafka brokers can be configured to use multiple listeners. Each listener can be used to listen on a different port or network interface and can have different configuration. Listeners are configured in the
listeners
property in the configuration file. The
listeners
property contains a list of listeners with each listener configured as
<listenerName>
://
<hostname>
:_<port>_
. When the hostname value is empty, Kafka will use
java.net.InetAddress.getCanonicalHostName()
as hostname. The following example shows how multiple listeners might be configured:
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
When a Kafka client wants to connect to a Kafka cluster, it first connects to a
bootstrap server
. The
bootstrap server
is one of the cluster nodes. It will provide the client with a list of all other brokers which are part of the cluster and the client will connect to them individually. By default the
bootstrap server
will provide the client with a list of nodes based on the
listeners
field.
Advertised listeners
It is possible to give the client a different set of addresses than given in the listeners property. It is useful in situations when additional network infrastructure, such as a proxy, is between the client and the broker, or when an external DNS name should be used instead of an IP address. Here, the broker allows defining the advertised addresses of the listeners in the advertised.listeners configuration property. This property has the same format as the listeners property. The following example shows how to configure advertised listeners:
listeners=INT1://:9092,INT2://:9093 advertised.listeners=INT1://my-broker-1.my-domain.com:1234,INT2://my-broker-1.my-domain.com:1234:9093
The names of the listeners have to match the names of the listeners from the
listeners
property.
Inter-broker listeners
When the cluster has replicated topics, the brokers responsible for such topics need to communicate with each other in order to replicate the messages in those topics. When multiple listeners are configured, the configuration field
inter.broker.listener.name
can be used to specify the name of the listener which should be used for replication between brokers. For example:
inter.broker.listener.name=REPLICATION
Apache Kafka stores all records it receives from producers in commit logs. The commit logs contain the actual data, in the form of records, that Kafka needs to deliver. These are not the application log files which record what the broker is doing.
Log directories
You can configure log directories using the
log.dirs
property file to store commit logs in one or multiple log directories. It should be set to
/var/lib/kafka
directory created during installation:
log.dirs=/var/lib/kafka
For performance reasons, you can configure log.dirs to multiple directories and place each of them on a different physical device to improve disk I/O performance. For example:
log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3
Broker ID is a unique identifier for each broker in the cluster. You can assign an integer greater than or equal to 0 as broker ID. The broker ID is used to identify the brokers after restarts or crashes and it is therefore important that the id is stable and does not change over time. The broker ID is configured in the broker properties file:
broker.id=1
This procedure describes how to configure and run Kafka as a multi-node cluster.
Prerequisites
Running the cluster
For each Kafka broker in your AMQ Streams cluster:
Edit the
/opt/kafka/config/server.properties
Kafka configuration file as follows:
Set the
broker.id
field to
0
for the first broker,
1
for the second broker, and so on.
Configure the details for connecting to ZooKeeper in the
zookeeper.connect
option.
Configure the Kafka listeners.
Set the directories where the commit logs should be stored in the
logs.dir
directory.
Here we see an example configuration for a Kafka broker:
broker.id=0 zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181 listeners=REPLICATION://:9091,PLAINTEXT://:9092 inter.broker.listener.name=REPLICATION log.dirs=/var/lib/kafka
In a typical installation where each Kafka broker is running on identical hardware, only the
broker.id
configuration property will differ between each broker config.
Start the Kafka broker with the default configuration file.
su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Verify that the Kafka broker is running.
jcmd | grep Kafka
Verifying the brokers
Once all nodes of the clusters are up and running, verify that all nodes are members of the Kafka cluster by sending a
dump
command to one of the ZooKeeper nodes using the
ncat
utility. The command prints all Kafka brokers registered in ZooKeeper.
Use ncat stat to check the node status.
echo dump | ncat zoo1.my-domain.com 2181
The output should contain all Kafka brokers you just configured and started.
Example output from the
ncat
command for Kafka cluster with 3 nodes:
SessionTracker dump: org.apache.zookeeper.server.quorum.LearnerSessionTracker@28848ab9 ephemeral nodes dump: Sessions with Ephemerals (3): 0x20000015dd00000: /brokers/ids/1 0x10000015dc70000: /controller /brokers/ids/0 0x10000015dc70001: /brokers/ids/2
Additional resources
By default, connections between ZooKeeper and Kafka are not authenticated. However, Kafka and ZooKeeper support Java Authentication and Authorization Service (JAAS) which can be used to set up authentication using Simple Authentication and Security Layer (SASL). ZooKeeper supports authentication using the DIGEST-MD5 SASL mechanism with locally stored credentials.
SASL authentication for ZooKeeper connections has to be configured in the JAAS configuration file. By default, Kafka will use the JAAS context named
Client
for connecting to ZooKeeper. The
Client
context should be configured in the
/opt/kafka/config/jass.conf
file. The context has to enable the
PLAIN
SASL authentication, as in the following example:
Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456"; };
This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism when connecting to ZooKeeper.
Prerequisites
Enabling SASL DIGEST-MD5 authentication
On all Kafka broker nodes, create or edit the
/opt/kafka/config/jaas.conf
JAAS configuration file and add the following context:
Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="<Username>" password="<Password>"; The username and password should be the same as configured in ZooKeeper. Following example shows theClient
context:Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456"; Restart all Kafka broker nodes one by one. To pass the JAAS configuration to Kafka brokers, use theKAFKA_OPTS
environment variable.su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Additional resources
Authorization in Kafka brokers is implemented using authorizer plugins.
In this section we describe how to use the
AclAuthorizer
plugin provided with Kafka.
Alternatively, you can use your own authorization plugins. For example, if you are using
OAuth 2.0 token-based authentication
, you can use
OAuth 2.0 authorization
.
Authorizer plugins, including
AclAuthorizer
, are enabled through the
authorizer.class.name
property:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
A fully-qualified name is required for the chosen authorizer. For
AclAuthorizer
, the fully-qualified name is
kafka.security.auth.SimpleAclAuthorizer
.
AclAuthorizer
uses ACL rules to manage access to Kafka brokers.
ACL rules are defined in the format:
Principal
P
is allowed / denied operation
O
on Kafka resource
R
from host
H
For example, a rule might be set so that user:
John
can
view
the topic
comments
from host
127.0.0.1
Host is the IP address of the machine that John is connecting from.
In most cases, the user is a producer or consumer application:
Consumer01
can
write
to the consumer group
accounts
from host
127.0.0.1
If ACL rules are not present
If ACL rules are not present for a given resource, all actions are denied. This behavior can be changed by setting the property
allow.everyone.if.no.acl.found
to
true
in the Kafka configuration file
/opt/kafka/config/server.properties
.
A
principal
represents the identity of a user. The format of the ID depends on the authentication mechanism used by clients to connect to Kafka:
User:ANONYMOUS
when connected without authentication.
User:<username>
when connected using simple authentication mechanisms, such as PLAIN or SCRAM.
For example
User:admin
or
User:user1
.
User:<DistinguishedName>
when connected using TLS client authentication.
For example
User:CN=user1,O=MyCompany,L=Prague,C=CZ
.
User:<Kerberos username>
when connected using Kerberos.
The
DistinguishedName
is the distinguished name from the client certificate.
The
Kerberos username
is the primary part of the Kerberos principal, which is used by default when connecting using Kerberos. You can use the
sasl.kerberos.principal.to.local.rules
property to configure how the Kafka principal is built from the Kerberos principal.
To use authorization, you need to have authentication enabled and used by your clients. Otherwise, all connections will have the principal
User:ANONYMOUS
.
For more information on methods of authentication, see
Encryption and authentication
.
Super users are allowed to take all actions regardless of the ACL rules.
Super users are defined in the Kafka configuration file using the property
super.users
.
For example:
super.users=User:admin,User:operator
When authorization is enabled, it is applied to all listeners and all connections. This includes the inter-broker connections used for replication of data between brokers. If enabling authorization, therefore, ensure that you use authentication for inter-broker connections and give the users used by the brokers sufficient rights. For example, if authentication between brokers uses the
kafka-broker
user, then super user configuration must include the username
super.users=User:kafka-broker
.
You can apply Kafka ACLs to these types of resource: Topics Consumer groups The cluster TransactionId DelegationToken
AclAuthorizer
authorizes operations on resources.
Fields with
X
in the following table mark the supported operations for each resource.
Table 4.1. Supported operations for a resource
Topics | Consumer Groups | Cluster | |
---|---|---|---|
Write Create Delete Alter Describe ClusterAction |
ACL rules are managed using the
bin/kafka-acls.sh
utility, which is provided as part of the Kafka distribution package.
Use
kafka-acls.sh
parameter options to add, list and remove ACL rules, and perform other functions.
The parameters require a double-hyphen convention, such as
--add
.
Option | Type | Description | Default |
---|---|---|---|
Action
Add ACL rule.
|
This procedure describes how to enable the
AclAuthorizer
plugin for authorization in Kafka brokers.
Prerequisites
Procedure
Edit the
/opt/kafka/config/server.properties
Kafka configuration file to use the
AclAuthorizer
.
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
Additional resources
AclAuthorizer
uses Access Control Lists (ACLs), which define a set of rules describing what users can and cannot do.
This procedure describes how to add ACL rules when using the
AclAuthorizer
plugin in Kafka brokers.
Rules are added using the
kafka-acls.sh
utility and stored in ZooKeeper.
Prerequisites
Procedure
Run
kafka-acls.sh
with the
--add
option.
Examples:
Allow
user1
and
user2
access to read from
myTopic
using the
MyConsumerGroup
consumer group.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
Deny
user1
access to read
myTopic
from IP address host
127.0.0.1
.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
Add
user1
as the consumer of
myTopic
with
MyConsumerGroup
.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
Additional resources
kafka-acls.sh
options, see
Section 4.7.1, “Simple ACL authorizer”
.
This procedure describes how to list existing ACL rules when using the
AclAuthorizer
plugin in Kafka brokers.
Rules are listed using the
kafka-acls.sh
utility.
Prerequisites
Procedure
Run
kafka-acls.sh
with the
--list
option.
For example:
$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --list --topic myTopic Current ACLs for resource `Topic:myTopic`: User:user1 has Allow permission for operations: Read from hosts: * User:user2 has Allow permission for operations: Read from hosts: * User:user2 has Deny permission for operations: Read from hosts: 127.0.0.1 User:user1 has Allow permission for operations: Describe from hosts: * User:user2 has Allow permission for operations: Describe from hosts: * User:user2 has Deny permission for operations: Describe from hosts: 127.0.0.1
Additional resources
kafka-acls.sh
options, see
Section 4.7.1, “Simple ACL authorizer”
.
This procedure describes how to remove ACL rules when using the
AclAuthorizer
plugin in Kafka brokers.
Rules are removed using the
kafka-acls.sh
utility.
Prerequisites
Procedure
Run
kafka-acls.sh
with the
--remove
option.
Examples:
Remove the ACL allowing Allow
user1
and
user2
access to read from
myTopic
using the
MyConsumerGroup
consumer group.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
Remove the ACL adding
user1
as the consumer of
myTopic
with
MyConsumerGroup
.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
Remove the ACL denying
user1
access to read
myTopic
from IP address host
127.0.0.1
.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
Additional resources
kafka-acls.sh
options, see
Section 4.7.1, “Simple ACL authorizer”
.
For more information about enabling authorization, see
Section 4.7.2, “Enabling authorization”
.
When authentication is enabled between Kafka and ZooKeeper, you can use ZooKeeper Access Control List (ACL) rules to automatically control access to Kafka’s metadata stored in ZooKeeper.
Enforcement of ZooKeeper ACL rules is controlled by the
zookeeper.set.acl
property in the
config/server.properties
Kafka configuration file.
The property is disabled by default and enabled by setting to
true
:
zookeeper.set.acl=true
If ACL rules are enabled, when a
znode
is created in ZooKeeper only the Kafka user who created it can modify or delete it. All other users have read-only access.
Kafka sets ACL rules only for newly created ZooKeeper
znodes
. If the ACLs are only enabled after the first start of the cluster, the
zookeeper-security-migration.sh
tool can set ACLs on all existing
znodes
.
Confidentiality of data in ZooKeeper
Data stored in ZooKeeper includes: Topic names and their configuration Salted and hashed user credentials when SASL SCRAM authentication is used. But ZooKeeper does not store any records sent and received using Kafka. The data stored in ZooKeeper is assumed to be non-confidential. If the data is to be regarded as confidential (for example because topic names contain customer IDs), the only option available for protection is isolating ZooKeeper on the network level and allowing access only to Kafka brokers.
This procedure describes how to enable ZooKeeper ACLs in Kafka configuration for a new Kafka cluster. Use this procedure only before the first start of the Kafka cluster. For enabling ZooKeeper ACLs in a cluster that is already running, see Section 4.8.3, “Enabling ZooKeeper ACLs in an existing Kafka cluster” .
Prerequisites
Procedure
Edit the
/opt/kafka/config/server.properties
Kafka configuration file to set the
zookeeper.set.acl
field to
true
on all cluster nodes.
zookeeper.set.acl=true
This procedure describes how to enable ZooKeeper ACLs in Kafka configuration for a Kafka cluster that is running. Use the
zookeeper-security-migration.sh
tool to set ZooKeeper ACLs on all existing
znodes
. The
zookeeper-security-migration.sh
is available as part of AMQ Streams, and can be found in the
bin
directory.
Prerequisites
Enabling the ZooKeeper ACLs
Edit the
/opt/kafka/config/server.properties
Kafka configuration file to set the
zookeeper.set.acl
field to
true
on all cluster nodes.
zookeeper.set.acl=true
znodes
using the
zookeeper-security-migration.sh
tool.
su - kafka cd /opt/kafka KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=<ZooKeeperURL> For example:su - kafka cd /opt/kafka KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=zoo1.my-domain.com:2181 exit
AMQ Streams supports encryption and authentication, which is configured as part of the listener configuration.
Encryption and authentication in Kafka brokers is configured per listener. For more information about Kafka listener configuration, see
Section 4.2, “Listeners”
.
Each listener in the Kafka broker is configured with its own security protocol. The configuration property
listener.security.protocol.map
defines which listener uses which security protocol. It maps each listener name to its security protocol. Supported security protocols are:
PLAINTEXT
SASL_PLAINTEXT
SASL_SSL
listeners
configuration:
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
the
listener.security.protocol.map
might look like this:
listener.security.protocol.map=INT1:SASL_PLAINTEXT,INT2:SASL_SSL,REPLICATION:SSL
This would configure the listener
INT1
to use unencrypted connections with SASL authentication, the listener
INT2
to use encrypted connections with SASL authentication and the
REPLICATION
interface to use TLS encryption (possibly with TLS client authentication). The same security protocol can be used multiple times. The following example is also a valid configuration:
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL
Such a configuration would use TLS encryption and TLS authentication for all interfaces. The following chapters will explain in more detail how to configure TLS and SASL.
Kafka supports TLS for encrypting communication with Kafka clients.
In order to use TLS encryption and server authentication, a keystore containing private and public keys has to be provided. This is usually done using a file in the Java Keystore (JKS) format. A path to this file is set in the
ssl.keystore.location
property. The
ssl.keystore.password
property should be used to set the password protecting the keystore. For example:
ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456
In some cases, an additional password is used to protect the private key. Any such password can be set using the
ssl.key.password
property.
Kafka is able to use keys signed by certification authorities as well as self-signed keys. Using keys signed by certification authorities should always be the preferred method. In order to allow clients to verify the identity of the Kafka broker they are connecting to, the certificate should always contain the advertised hostname(s) as its Common Name (CN) or in the Subject Alternative Names (SAN).
It is possible to use different SSL configurations for different listeners. All options starting with
ssl.
can be prefixed with
listener.name.<NameOfTheListener>.
, where the name of the listener has to be always in lower case. This will override the default SSL configuration for that specific listener. The following example shows how to use different SSL configurations for different listeners:
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094 listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL # Default configuration - will be used for listeners INT1 and INT2 ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456 # Different configuration for listener REPLICATION listener.name.replication.ssl.keystore.location=/path/to/keystore/server-1.jks listener.name.replication.ssl.keystore.password=123456
Additional TLS configuration options
In addition to the main TLS configuration options described above, Kafka supports many options for fine-tuning the TLS configuration. For example, to enable or disable TLS / SSL protocols or cipher suites:
ssl.cipher.suites
ssl.enabled.protocols
TLSv1.2,TLSv1.1,TLSv1
.
For a complete list of supported Kafka broker configuration options, see
Appendix A,
Broker configuration parameters
.
This procedure describes how to enable encryption in Kafka brokers.
Prerequisites
Procedure
/opt/kafka/config/server.properties
Kafka configuration file on all cluster nodes for the following:
Change the
listener.security.protocol.map
field to specify the
SSL
protocol for the listener where you want to use TLS encryption.
Set the
ssl.keystore.location
option to the path to the JKS keystore with the broker certificate.
Set the
ssl.keystore.password
option to the password you used to protect the keystore.
For example:
listeners=UNENCRYPTED://:9092,ENCRYPTED://:9093,REPLICATION://:9094 listener.security.protocol.map=UNENCRYPTED:PLAINTEXT,ENCRYPTED:SSL,REPLICATION:PLAINTEXT ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456
Additional resources
For authentication, you can use: TLS client authentication based on X.509 certificates on encrypted connections A supported Kafka SASL (Simple Authentication and Security Layer) mechanism OAuth 2.0 token based authentication
TLS client authentication can be used only on connections which are already using TLS encryption. To use TLS client authentication, a truststore with public keys can be provided to the broker. These keys can be used to authenticate clients connecting to the broker. The truststore should be provided in Java Keystore (JKS) format and should contain public keys of the certification authorities. All clients with public and private keys signed by one of the certification authorities included in the truststore will be authenticated. The location of the truststore is set using field
ssl.truststore.location
. In case the truststore is password protected, the password should be set in the
ssl.truststore.password
property. For example:
ssl.truststore.location=/path/to/keystore/server-1.jks ssl.truststore.password=123456
Once the truststore is configured, TLS client authentication has to be enabled using the
ssl.client.auth
property. This property can be set to one of three different values:
TLS client authentication is switched off. (Default value)
requested
required
CN=someuser
will be authenticated with the following principal
CN=someuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
. When TLS client authentication is not used and SASL is disabled, the principal name will be
ANONYMOUS
.
SASL authentication is configured using Java Authentication and Authorization Service (JAAS). JAAS is also used for authentication of connections between Kafka and ZooKeeper. JAAS uses its own configuration file. The recommended location for this file is
/opt/kafka/config/jaas.conf
. The file has to be readable by the
kafka
user. When running Kafka, the location of this file is specified using Java system property
java.security.auth.login.config
. This property has to be passed to Kafka when starting the broker nodes:
KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/my/jaas.config"; bin/kafka-server-start.sh
SASL authentication is supported both through plain unencrypted connections as well as through TLS connections. SASL can be enabled individually for each listener. To enable it, the security protocol in
listener.security.protocol.map
has to be either
SASL_PLAINTEXT
or
SASL_SSL
.
SASL authentication in Kafka supports several different mechanisms:
PLAIN
SCRAM-SHA-256
and
SCRAM-SHA-512
GSSAPI
The
PLAIN
mechanism sends the username and password over the network in an unencrypted format. It should be therefore only be used in combination with TLS encryption.
The SASL mechanisms are configured via the JAAS configuration file. Kafka uses the JAAS context named
KafkaServer
. After they are configured in JAAS, the SASL mechanisms have to be enabled in the Kafka configuration. This is done using the
sasl.enabled.mechanisms
property. This property contains a comma-separated list of enabled mechanisms:
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
In case the listener used for inter-broker communication is using SASL, the property
sasl.mechanism.inter.broker.protocol
has to be used to specify the SASL mechanism which it should use. For example:
sasl.mechanism.inter.broker.protocol=PLAIN
The username and password which will be used for the inter-broker communication has to be specified in the
KafkaServer
JAAS context using the field
username
and
password
.
SASL PLAIN
To use the PLAIN mechanism, the usernames and password which are allowed to connect are specified directly in the JAAS context. The following example shows the context configured for SASL PLAIN authentication. The example configures three different users:
admin
user1
user2
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; The JAAS configuration file with the user database should be kept in sync on all Kafka brokers. When SASL PLAIN is also used for inter-broker authentication, theusername
andpassword
properties should be included in the JAAS context:KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_user1="123456" user_user2="123456"; };SASL SCRAM
SCRAM authentication in Kafka consists of two mechanisms:
SCRAM-SHA-256
andSCRAM-SHA-512
. These mechanisms differ only in the hashing algorithm used - SHA-256 versus stronger SHA-512. To enable SCRAM authentication, the JAAS configuration file has to include the following configuration:KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; When enabling SASL authentication in the Kafka configuration file, both SCRAM mechanisms can be listed. However, only one of them can be chosen for the inter-broker communication. For example:sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512User credentials for the SCRAM mechanism are stored in ZooKeeper. The
kafka-configs.sh
tool can be used to manage them. For example, run the following command to add user user1 with password 123456:bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1To delete a user credential use:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1SASL GSSAPI
The SASL mechanism used for authentication using Kerberos is called
GSSAPI
. To configure Kerberos SASL authentication, the following configuration should be added to the JAAS configuration file:KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/[email protected]"; The domain name in the Kerberos principal has to be always in upper case. In addition to the JAAS configuration, the Kerberos service name needs to be specified in thesasl.kerberos.service.name
property in the Kafka configuration:sasl.enabled.mechanisms=GSSAPI sasl.mechanism.inter.broker.protocol=GSSAPI sasl.kerberos.service.name=kafkaMultiple SASL mechanisms
Kafka can use multiple SASL mechanisms at the same time. The different JAAS configurations can be all added to the same context:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/[email protected]"; org.apache.kafka.common.security.scram.ScramLoginModule required; When multiple mechanisms are enabled, clients will be able to choose the mechanism which they want to use.
This procedure describes how to enable TLS client authentication in Kafka brokers.
Prerequisites
Procedure
/opt/kafka/config/server.properties
Kafka configuration file on all cluster nodes for the following:
Set the
ssl.truststore.location
option to the path to the JKS truststore with the certification authority of the user certificates.
Set the
ssl.truststore.password
option to the password you used to protect the truststore.
Set the
ssl.client.auth
option to
required
.
For example:
ssl.truststore.location=/path/to/truststore.jks ssl.truststore.password=123456 ssl.client.auth=required
Additional resources
This procedure describes how to enable SASL PLAIN authentication in Kafka brokers.
Prerequisites
Procedure
Edit or create the
/opt/kafka/config/jaas.conf
JAAS configuration file. This file should contain all your users and their passwords. Make sure this file is the same on all Kafka brokers.
For example:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; Edit the/opt/kafka/config/server.properties
Kafka configuration file on all cluster nodes for the following: Change thelistener.security.protocol.map
field to specify theSASL_PLAINTEXT
orSASL_SSL
protocol for the listener where you want to use SASL PLAIN authentication. Set thesasl.enabled.mechanisms
option toPLAIN
. For example:listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=PLAIN
(Re)start the Kafka brokers using the KAFKA_OPTS environment variable to pass the JAAS configuration to Kafka brokers.
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Additional resources
This procedure describes how to enable SASL SCRAM authentication in Kafka brokers.
Prerequisites
Procedure
Edit or create the
/opt/kafka/config/jaas.conf
JAAS configuration file. Enable the
ScramLoginModule
for the
KafkaServer
context. Make sure this file is the same on all Kafka brokers.
For example:
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; Edit the/opt/kafka/config/server.properties
Kafka configuration file on all cluster nodes for the following: Change thelistener.security.protocol.map
field to specify theSASL_PLAINTEXT
orSASL_SSL
protocol for the listener where you want to use SASL SCRAM authentication. Set thesasl.enabled.mechanisms
option toSCRAM-SHA-256
orSCRAM-SHA-512
. For example:listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=SCRAM-SHA-512
(Re)start the Kafka brokers using the KAFKA_OPTS environment variable to pass the JAAS configuration to Kafka brokers.
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Additional resources
This procedure describes how to add new users for authentication using SASL SCRAM.
Prerequisites
Procedure
Use the
kafka-configs.sh
tool to add new SASL SCRAM users.
bin/kafka-configs.sh --bootstrap-server <BrokerAddress> --alter --add-config 'SCRAM-SHA-512=[password=<Password>]' --entity-type users --entity-name <Username>
For example:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
Additional resources
For more information about configuring SASL SCRAM authentication in clients, see: Appendix D, Producer configuration parameters Appendix C, Consumer configuration parameters
This procedure describes how to remove users when using SASL SCRAM authentication.
Prerequisites
Procedure
Use the
kafka-configs.sh
tool to delete SASL SCRAM users.
bin/kafka-configs.sh --bootstrap-server <BrokerAddress> --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name <Username>
For example:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
Additional resources
For more information about configuring SASL SCRAM authentication in clients, see: Appendix D, Producer configuration parameters Appendix C, Consumer configuration parameters
AMQ Streams supports the use of OAuth 2.0 authentication using the SASL OAUTHBEARER mechanism. OAuth 2.0 enables standardized token based authentication and authorization between applications, using a central authorization server to issue tokens that grant limited access to resources. You can configure OAuth 2.0 authentication, then OAuth 2.0 authorization . OAuth 2.0 authentication can also be used in conjunction with ACL-based Kafka authorization regardless of the authorization server used. Using OAuth 2.0 token-based authentication, application clients can access resources on application servers (called resource servers ) without exposing account credentials. The application client passes an access token as a means of authenticating, which application servers can also use to determine the level of access to grant. The authorization server handles the granting of access and inquiries about access. In the context of AMQ Streams: Kafka brokers act as OAuth 2.0 resource servers Kafka clients act as OAuth 2.0 application clients Kafka clients authenticate to Kafka brokers. The brokers and clients communicate with the OAuth 2.0 authorization server, as necessary, to obtain or validate access tokens. For a deployment of AMQ Streams, OAuth 2.0 integration provides: Server-side OAuth 2.0 support for Kafka brokers Client-side OAuth 2.0 support for Kafka Mirror Maker, Kafka Connect and the Kafka Bridge
Additional resources
The Kafka SASL OAUTHBEARER mechanism is used to establish authenticated sessions with a Kafka broker. A Kafka client initiates a session with the Kafka broker using the SASL OAUTHBEARER mechanism for credentials exchange, where credentials take the form of an access token. Kafka brokers and clients need to be configured to use OAuth 2.0.
You can configure OAuth 2.0 settings using Java Authentication and Authorization Service (JAAS) properties or environment variables.
JAAS properties are configured in the
server.properties
configuration file, and passed as key-values pairs of the
listener.name.
LISTENER-NAME
.oauthbearer.sasl.jaas.config
property.
If using environment variables, you still need the
listener.name.
LISTENER-NAME
.oauthbearer.sasl.jaas.config
property in the
server.properties
file, but you can omit the other JAAS properties.
You can use capitalized or upper-case environment variable naming conventions.
The Kafka OAuth 2.0 library uses properties that start with
oauth.
to configure authentication, and properties that start with
strimzi.
to
configure OAuth 2.0 authorization
.
Kafka broker configuration for OAuth 2.0 involves: Creating the OAuth 2.0 client in the authorization server Configuring OAuth 2.0 authentication in the Kafka cluster In relation to the authorization server, Kafka brokers and Kafka clients are both regarded as OAuth 2.0 clients.
To configure a Kafka broker to validate the token received during session initiation, the recommended approach is to create a OAuth 2.0
client
definition in an authorization server, configured as
confidential
, with the following client credentials enabled:
Client ID of
kafka-broker
(for example)
Client ID and secret as the authentication mechanism
You only need to use a client ID and secret when using a non-public introspection endpoint of the authorization server. The credentials are not typically required when using public authorization server endpoints, as with fast local JWT token validation.
To use OAuth 2.0 authentication in the Kafka cluster, you enable a listener configuration for your Kafka cluster in the Kafka
server.properties
file. A minimum configuration is required. You can also configure a TLS listener, where TLS is used for inter-broker communication.
You can configure the broker for token validation by the authorization server using the:
JWKS
endpoint in combination with signed JWT-formatted access tokens
Introspection
endpoint
The minimum configuration shown here applies a
global
listener configuration. This means that inter-broker communication goes through the same listener as application clients.
To enable OAuth 2.0 configuration for a specific listener, you specify
listener.name.
LISTENER-NAME
.sasl.enabled.mechanisms
instead of
sasl.enabled.mechanisms
, which is shown in the listener configuration examples below.
LISTENER-NAME
is the name of the listener (case insensitive). In the example below, we name the listener
CLIENT
, so the property name will be
listener.name.client.sasl.enabled.mechanisms
.
Minimum listener configuration for OAuth 2.0 authentication using a JWKS endpoint
sasl.enabled.mechanisms=OAUTHBEARER 1 listeners=CLIENT://0.0.0.0:9092 2 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 3 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 4 sasl.mechanism.inter.broker.protocol=OAUTHBEARER 5 inter.broker.listener.name=CLIENT 6 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 7 listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 8 oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 9 oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 10 oauth.username.claim="preferred_username" \ 11 oauth.client.id="kafka-broker" \ 12 oauth.client.secret="kafka-secret" \ 13 oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/token" ; 14 listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 15 listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 16Enables the OAUTHBEARER as SASL mechanism for credentials exchange over SASL. Configures a listener for client applications to connect to. The system
hostname
is used as an advertised hostname, which clients must resolve in order to reconnect. The listener is named
CLIENT
in this example.
Specifies the channel protocol for the listener.
SASL_SSL
is for TLS.
SASL_PLAINTEXT
is used for an unencrypted connection (no TLS), but there is risk of eavesdropping and interception at the TCP connection layer.
Specifies
OAUTHBEARER
as
SASL
for the
CLIENT
listener. The client name (
CLIENT
) is usually specified in uppercase in the
listeners
property, and in lowercase for
listener.name
properties (
listener.name.client
). and in lowercase when part of a
listener.name.
client
.*
property.
Specifies
OAUTHBEARER
as
SASL
for inter-broker communication.
Specifies the listener for inter-broker communication. The specification is required for the configuration to be valid.
Configures OAuth 2.0 authentication on the client listener.
Configures authentication settings for client and inter-broker communication. The
oauth.client.id
,
oauth.client.secret
, and
auth.token.endpoint.uri
properties relate to inter-broker configuration.
A valid issuer URI. Only access tokens issued by this issuer will be accepted. For example,
https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
.
The JWKS endpoint URL. For example,
https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
.
The token claim (or key) that contains the actual user name in the token. The user name is the
principal
used to identify the user. The value will depend on the authentication flow and the authorization server used.
Client ID of the Kafka broker, which is the same for all brokers. This is the
client registered with the authorization server as
kafka-broker
.
Secret for the Kafka broker, which is the same for all brokers.
The OAuth 2.0 token endpoint URL to your authorization server. For production, always use HTTPs. For example,
https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token
.
Enables (and is only required for) OAuth 2.0 authentication for inter-broker communication.
(Optional) Enforces session expiry when token expires, and also activates the
Kafka re-authentication mechanism
. If the specified value is less than the time left for the access token to expire, then the client will have to re-authenticate before the actual token expiry. By default, the session does not expire when the access token expires, and the client does not attempt re-authentication.
TLS listener configuration for OAuth 2.0 authentication
sasl.enabled.mechanisms= listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 1 listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 2 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER inter.broker.listener.name=REPLICATION listener.name.replication.ssl.keystore.password=KEYSTORE-PASSWORD 3 listener.name.replication.ssl.truststore.password=TRUSTSTORE-PASSWORD listener.name.replication.ssl.keystore.type=JKS listener.name.replication.ssl.truststore.type=JKS listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 4 listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 5 listener.name.replication.ssl.keystore.location=PATH-TO-KEYSTORE 6 listener.name.replication.ssl.truststore.location=PATH-TO-TRUSTSTORE 7 listener.name.replication.ssl.client.auth=required 8 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ oauth.username.claim="preferred_username" ; 9Separate configurations are required for inter-broker communication and client applications. Configures the REPLICATION listener to use TLS, and the CLIENT listener to use SASL over an unencrypted channel. The client could use an encrypted channel (
SASL_SSL
) in a production environment.
The
ssl.
properties define the TLS configuration.
Random number generator implementation. If not set, the Java platform SDK default is used.
Hostname verification. If set to an empty string, the hostname verification is turned off. If not set, the default value is HTTPS, which enforces hostname verification for server certificates.
Path to the keystore for the listener.
Path to the truststore for the listener.
Specifies that clients of the
REPLICATION
listener have to authenticate with a client certificate when establishing a TLS connection (used for inter-broker connectivity).
Configures the
CLIENT
listener for OAuth 2.0. Connectivity with the authorization server should use secure HTTPS connections.
Fast local JWT token validation checks a JWT token signature locally.
The local check ensures that a token:
Conforms to type by containing a (
typ
) claim value of
Bearer
for an access token
Is valid (not expired)
Has an issuer that matches a
validIssuerURI
You specify a
valid issuer URI
when you configure the listener, so that any tokens not issued by the authorization server are rejected.
The authorization server does not need to be contacted during fast local JWT token validation. You activate fast local JWT token validation by specifying a
JWKs endpoint URI
exposed by the OAuth 2.0 authorization server. The endpoint contains the public keys used to validate signed JWT tokens, which are sent as credentials by Kafka clients.
All communication with the authorization server should be performed using HTTPS.
For a TLS listener, you can configure a certificate
truststore
and point to the truststore file.
Example properties for fast local JWT token validation
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 1 oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 2 oauth.jwks.refresh.seconds="300" \ 3 oauth.jwks.refresh.min.pause.seconds="1" \ 4 oauth.jwks.expiry.seconds="360" \ 5 oauth.username.claim="preferred_username" \ 6 oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ 7 oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ 8 oauth.ssl.truststore.type="PKCS12" ; 9A valid issuer URI. Only access tokens issued by this issuer will be accepted. For example, https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME . The JWKS endpoint URL. For example, https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs . The period between endpoint refreshes (default 300). The minimum pause in seconds between consecutive attempts to refresh JWKS public keys. When an unknown signing key is encountered, the JWKS keys refresh is scheduled outside the regular periodic schedule with at least the specified pause since the last refresh attempt. The refreshing of keys follows the rule of exponential backoff, retrying on unsuccessful refreshes with ever increasing pause, until it reaches
oauth.jwks.refresh.seconds
. The default value is 1.
The duration the JWKs certificates are considered valid before they expire. Default is
360
seconds. If you specify a longer time, consider the risk of allowing access to revoked certificates.
The token claim (or key) that contains the actual user name in the token. The user name is the
principal
used to identify the user. The value will depend on the authentication flow and the authorization server used.
The location of the truststore used in the TLS configuration.
Password to access the truststore.
The truststore type in PKCS #12 format.
Token validation using an OAuth 2.0 introspection endpoint treats a received access token as opaque. The Kafka broker sends an access token to the introspection endpoint, which responds with the token information necessary for validation. Importantly, it returns up-to-date information if the specific access token is valid, and also information about when the token expires. To configure OAuth 2.0 introspection-based validation, you specify an introspection endpoint URI rather than the JWKs endpoint URI specified for fast local JWT token validation. Depending on the authorization server, you typically have to specify a client ID and client secret , because the introspection endpoint is usually protected.
Example properties for an introspection endpoint
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.introspection.endpoint.uri="https://AUTH-SERVER-ADDRESS/introspection" \ 1 oauth.client.id="kafka-broker" \ 2 oauth.client.secret="kafka-broker-secret" \ 3 oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ 4 oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ 5 oauth.ssl.truststore.type="PKCS12" \ 6 oauth.username.claim="preferred_username" ; 7The OAuth 2.0 introspection endpoint URI. For example, https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token/introspect . Client ID of the Kafka broker. Secret for the Kafka broker. The location of the truststore used in the TLS configuration. Password to access the truststore. The truststore type in PKCS #12 format. The token claim (or key) that contains the actual user name in the token. The user name is the principal used to identify the user. The value of
oauth.username.claim
depends on the authorization server used.
The Kafka
SASL OAUTHBEARER
mechanism, which is used for OAuth 2.0 authentication in AMQ Streams, supports a Kafka feature called the
re-authentication
mechanism.
When the re-authentication mechanism is enabled through a listener configuration, the broker’s authenticated session expires when the access token expires. The client must then re-authenticate to the existing session by sending a new, valid access token to the broker, without dropping the connection.
If token validation is successful, a new client session is started using the existing connection. If the client fails to re-authenticate, the broker will close the connection if further attempts are made to send or receive messages. Java clients that use Kafka client library 2.2 or later automatically re-authenticate if the re-authentication mechanism is enabled on the broker.
You enable session re-authentication for a Kafka broker in the Kafka
server.properties
file. Set the
connections.max.reauth.ms
property for a TLS listener with OAUTHBEARER enabled as the SASL mechanism.
You can specify session re-authentication per listener. For example:
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
Session re-authentication is supported for both types of token validation (fast local JWT and introspection endpoint). For an example configuration, see Section 4.10.6.2, “Configuring OAuth 2.0 support for Kafka brokers” . For more information about the re-authentication mechanism, which was added in Kafka version 2.2, see KIP-368 .
A Kafka client is configured with either: The Credentials required to obtain a valid access token from an authorization server (client ID and Secret) A valid long-lived access token or refresh token, obtained using tools provided by an authorization server Credentials are never sent to the Kafka broker. The only information ever sent to the Kafka broker is an access token. When a client obtains an access token, no further communication with the authorization server is needed. The simplest mechanism is authentication with a client ID and Secret. Using a long-lived access token, or a long-lived refresh token, adds more complexity because there is additional dependency on authorization server tools. If you are using long-lived access tokens, you may need to configure the client in the authorization server to increase the maximum lifetime of the token. If the Kafka client is not configured with an access token directly, the client exchanges credentials for an access token during Kafka session initiation by contacting the authorization server. The Kafka client exchanges either: Client ID and Secret Client ID, refresh token, and (optionally) a Secret
In this section, we explain and visualize the communication flow between Kafka client, Kafka broker, and authorization server during Kafka session initiation. The flow depends on the client and server configuration. When a Kafka client sends an access token as credentials to a Kafka broker, the token needs to be validated. Depending on the authorization server used, and the configuration options available, you may prefer to use: Fast local token validation based on JWT signature checking and local token introspection, without contacting the authorization server An OAuth 2.0 introspection endpoint provided by the authorization server Using fast local token validation requires the authorization server to provide a JWKS endpoint with public certificates that are used to validate signatures on the tokens. Another option is to use an OAuth 2.0 introspection endpoint on the authorization server. Each time a new Kafka broker connection is established, the broker passes the access token received from the client to the authorization server, and checks the response to confirm whether or not the token is valid. Kafka client credentials can also be configured for: Direct local access using a previously generated long-lived access token Contact with the authorization server for a new access token to be issued An authorization server might only allow the use of opaque access tokens, which means that local token validation is not possible.
Here you can see the communication flows, for different configurations of Kafka clients and brokers, during Kafka session authentication. Client using client ID and secret, with broker delegating validation to authorization server Client using client ID and secret, with broker performing fast local token validation Client using long-lived access token, with broker delegating validation to authorization server Client using long-lived access token, with broker performing fast local validation
Client using client ID and secret, with broker delegating validation to authorization server
Kafka client requests access token from authorization server, using client ID and secret, and optionally a refresh token. Authorization server generates a new access token. Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token. Kafka broker validates the access token by calling a token introspection endpoint on authorization server, using its own client ID and secret. Kafka client session is established if the token is valid.
Client using client ID and secret, with broker performing fast local token validation
Kafka client authenticates with authorization server from the token endpoint, using a client ID and secret, and optionally a refresh token. Authorization server generates a new access token. Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token. Kafka broker validates the access token locally using a JWT token signature check, and local token introspection.
Client using long-lived access token, with broker delegating validation to authorization server
Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token. Kafka broker validates the access token by calling a token introspection endpoint on authorization server, using its own client ID and secret. Kafka client session is established if the token is valid.
Client using long-lived access token, with broker performing fast local validation
Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token. Kafka broker validates the access token locally using JWT token signature check, and local token introspection.
Fast local JWT token signature validation is suitable only for short-lived tokens as there is no check with the authorization server if a token has been revoked. Token expiration is written into the token, but revocation can happen at any time, so cannot be accounted for without contacting the authorization server. Any issued token would be considered valid until it expires.
OAuth 2.0 is used for interaction between Kafka clients and AMQ Streams components. In order to use OAuth 2.0 for AMQ Streams, you must: Configure an OAuth 2.0 authorization server for the AMQ Streams cluster and Kafka clients Deploy or update the Kafka cluster with Kafka broker listeners configured to use OAuth 2.0 Update your Java-based Kafka clients to use OAuth 2.0
This procedure describes how to deploy Red Hat Single Sign-On as an authorization server and configure it for integration with AMQ Streams. The authorization server provides a central point for authentication and authorization, and management of users, clients, and permissions. Red Hat Single Sign-On has a concept of realms where a realm represents a separate set of users, clients, permissions, and other configuration. You can use a default master realm , or create a new one. Each realm exposes its own OAuth 2.0 endpoints, which means that application clients and application servers all need to use the same realm. To use OAuth 2.0 with AMQ Streams, you need a deployment of an authorization server to be able to create and manage authentication realms. If you already have Red Hat Single Sign-On deployed, you can skip the deployment step and use your current deployment.
Before you begin
You will need to be familiar with using Red Hat Single Sign-On. For installation and administration instructions, see: Server Installation and Configuration Guide Server Administration Guide
Prerequisites
Procedure
Install Red Hat Single Sign-On.
You can install from a ZIP file or by using an RPM.
Log in to the Red Hat Single Sign-On Admin Console to create the OAuth 2.0 policies for AMQ Streams.
Login details are provided when you deploy Red Hat Single Sign-On.
Create and enable a realm.
You can use an existing master realm.
Adjust the session and token timeouts for the realm, if required.
Create a client called
kafka-broker
.
From the
Settings
tab, set:
Access Type
to
Confidential
Standard Flow Enabled
to
OFF
to disable web login for this client
Service Accounts Enabled
to
ON
to allow this client to authenticate in its own name
Click
Save
before continuing.
From the
Credentials
tab, take a note of the secret for using in your AMQ Streams Kafka cluster configuration.
Repeat the client creation steps for any application client that will connect to your Kafka brokers.
Create a definition for each new client.
You will use the names as client IDs in your configuration.
What to do next
After deploying and configuring the authorization server, configure the Kafka brokers to use OAuth 2.0 .
This procedure describes how to configure Kafka brokers so that the broker listeners are enabled to use OAuth 2.0 authentication using an authorization server. We advise use of OAuth 2.0 over an encrypted interface through configuration of TLS listeners. Plain listeners are not recommended. Configure the Kafka brokers using properties that support your chosen authorization server, and the type of authorization you are implementing.
Before you start
For more information on the configuration and authentication of Kafka broker listeners, see: Listeners Encryption and authentication For a description of the properties used in the listener configuration, see: OAuth 2.0 Kafka broker configuration
Prerequisites
Procedure
Configure the Kafka broker listener configuration in the
server.properties
file.
For example:
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Configure broker connection settings as part of the
listener.name.client.oauthbearer.sasl.jaas.config
.
The examples here show connection configuration options.
Example 1: Local token validation using a JWKS endpoint configuration
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME" \ oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs" \ oauth.jwks.refresh.seconds="300" \ oauth.jwks.refresh.min.pause.seconds="1" \ oauth.jwks.expiry.seconds="360" \ oauth.username.claim="preferred_username" \ oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ oauth.ssl.truststore.type="PKCS12" ; listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
Example 2: Delegating token validation to the authorization server through the OAuth 2.0 introspection endpoint
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.introspection.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/introspection" \
# ...
If required, configure access to the authorization server.
This step is normally required for a production environment, unless a technology like
service mesh
is used to configure secure channels outside containers.
Provide a custom truststore for connecting to a secured authorization server. SSL is always required for access to the authorization server.
Set properties to configure the truststore.
For example:
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-broker-secret" \ oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ oauth.ssl.truststore.type="PKCS12" ;
If the certificate hostname does not match the access URL hostname, you can turn off certificate hostname validation:
oauth.ssl.endpoint.identification.algorithm=""
The check ensures that client connection to the authorization server is authentic. You may wish to turn off the validation in a non-production environment. Configure additional properties according to your chosen authentication flow.
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token" \ 1 oauth.valid.issuer.uri="https://https://AUTH-SERVER-ADDRESS/auth/REALM-NAME" \ 2 oauth.client.id="kafka-broker" \ 3 oauth.client.secret="kafka-broker-secret" \ 4 oauth.refresh.token="REFRESH-TOKEN-FOR-KAFKA-BROKERS" \ 5 oauth.access.token="ACCESS-TOKEN-FOR-KAFKA-BROKERS" ; 6
KeycloakRBACAuthorizer
is used, or an OAuth 2.0 enabled listener is used for inter-broker communication.
A valid issuer URI. Only access tokens issued by this issuer will be accepted. (Always required.)
The configured client ID of the Kafka broker, which is the same for all brokers. This is the
client registered with the authorization server as
kafka-broker
. Required when an introspection endpoint is used for token validation, or when
KeycloakRBACAuthorizer
is used.
The configured secret for the Kafka broker, which is the same for all brokers. When the broker must authenticate to the authorization server, either a client secret, access token or a refresh token has to be specified.
(Optional) A long-lived refresh token for Kafka brokers.
(Optional) A long-lived access token for Kafka brokers.
Depending on how you apply OAuth 2.0 authentication, and the type of authorization server being used, add additional configuration settings:
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.check.issuer=false \ 1 oauth.fallback.username.claim="CLIENT-ID" \ 2 oauth.fallback.username.prefix="CLIENT-ACCOUNT" \ 3 oauth.valid.token.type="bearer" \ 4 oauth.userinfo.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/userinfo" ; 5
iss
claim, it is not possible to perform an issuer check. In this situation, set
oauth.check.issuer
to
false
and do not specify a
oauth.valid.issuer.uri
. Default is
true
.
An authorization server may not provide a single attribute to identify both regular users and clients. When a client authenticates in its own name, the server might provide a
client ID
. When a user authenticates using a username and password, to obtain a refresh token or an access token, the server might provide a
username
attribute in addition to a client ID. Use this fallback option to specify the username claim (attribute) to use if a primary user ID attribute is not available.
In situations where
oauth.fallback.username.claim
is applicable, it may also be necessary to prevent name collisions between the values of the username claim, and those of the fallback username claim. Consider a situation where a client called
producer
exists, but also a regular user called
producer
exists. In order to differentiate between the two, you can use this property to add a prefix to the user ID of the client.
(Only applicable when using
oauth.introspection.endpoint.uri
) Depending on the authorization server you are using, the introspection endpoint may or may not return the
token type
attribute, or it may contain different values. You can specify a valid token type value that the response from the introspection endpoint has to contain.
(Only applicable when using
oauth.introspection.endpoint.uri
) The authorization server may be configured or implemented in such a way to not provide any identifiable information in an introspection endpoint response. In order to obtain the user ID, you can configure the URI of the
userinfo
endpoint as a fallback. The
oauth.fallback.username.claim
,
oauth.fallback.username.claim
, and
oauth.fallback.username.prefix
settings are applied to the response of the
userinfo
endpoint.
What to do next
This procedure describes how to configure Kafka producer and consumer APIs to use OAuth 2.0 for interaction with Kafka brokers. Add a client callback plugin to your pom.xml file, and configure the system properties.
Prerequisites
Procedure
Add the client library with OAuth 2.0 support to the
pom.xml
file for the Kafka client:
<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.6.1.redhat-00003</version> </dependency>
Configure the system properties for the callback: For example:
System.setProperty(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, “https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token”); 1 System.setProperty(ClientConfig.OAUTH_CLIENT_ID, "CLIENT-NAME"); 2 System.setProperty(ClientConfig.OAUTH_CLIENT_SECRET, "CLIENT_SECRET"); 3 System.setProperty(ClientConfig.OAUTH_SCOPE, "SCOPE-VALUE") 4
scope
for requesting the token from the token endpoint. An authorization server may require a client to specify the scope.
Enable the
SASL OAUTHBEARER
mechanism on a TLS encrypted connection in the Kafka client configuration:
For example:
props.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
props.put("security.protocol", "SASL_SSL"); 1
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
SASL_SSL
for use over TLS connections. Use
SASL_PLAINTEXT
over unencrypted connections.
Verify that the Kafka client can access the Kafka brokers.
If you are using OAuth 2.0 with Red Hat Single Sign-On for token-based authentication, you can also use Red Hat Single Sign-On to configure authorization rules to constrain client access to Kafka brokers. Authentication establishes the identity of a user. Authorization decides the level of access for that user.
AMQ Streams supports the use of OAuth 2.0 token-based authorization through Red Hat Single Sign-On
Authorization Services
, which allows you to manage security policies and permissions centrally.
Security policies and permissions defined in Red Hat Single Sign-On are used to grant access to resources on Kafka brokers. Users and clients are matched against policies that permit access to perform specific actions on Kafka brokers.
Kafka allows all users full access to brokers by default, and also provides the
AclAuthorizer
plugin to configure authorization based on Access Control Lists (ACLs).
ZooKeeper stores ACL rules that grant or deny access to resources based on
username
. However, OAuth 2.0 token-based authorization with Red Hat Single Sign-On offers far greater flexibility on how you wish to implement access control to Kafka brokers. In addition, you can configure your Kafka brokers to use OAuth 2.0 authorization and ACLs.
Additional resources
OAuth 2.0 authorization in AMQ Streams uses Red Hat Single Sign-On server Authorization Services REST endpoints to extend token-based authentication with Red Hat Single Sign-On by applying defined security policies on a particular user, and providing a list of permissions granted on different resources for that user. Policies use roles and groups to match permissions to users. OAuth 2.0 authorization enforces permissions locally based on the received list of grants for the user from Red Hat Single Sign-On Authorization Services.
A Red Hat Single Sign-On
authorizer
(
KeycloakRBACAuthorizer
) is provided with AMQ Streams. To be able to use the Red Hat Single Sign-On REST endpoints for Authorization Services provided by Red Hat Single Sign-On, you configure a custom authorizer on the Kafka broker.
The authorizer fetches a list of granted permissions from the authorization server as needed, and enforces authorization locally on the Kafka Broker, making rapid authorization decisions for each client request.
This procedure describes how to configure Kafka brokers to use OAuth 2.0 authorization using Red Hat Single Sign-On Authorization Services.
Before you begin
Consider the access you require or want to limit for certain users. You can use a combination of Red Hat Single Sign-On groups , roles , clients , and users to configure access in Red Hat Single Sign-On. Typically, groups are used to match users based on organizational departments or geographical locations. And roles are used to match users based on their function. With Red Hat Single Sign-On, you can store users and groups in LDAP, whereas clients and roles cannot be stored this way. Storage and access to user data may be a factor in how you choose to configure authorization policies. Super users always have unconstrained access to a Kafka broker regardless of the authorization implemented on the Kafka broker.
Prerequisites
Procedure
server.properties
configuration file to install the authorizer in Kafka:
authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer principal.builder.class=io.strimzi.kafka.oauth.server.authorizer.JwtKafkaPrincipalBuilder
Add configuration for the Kafka brokers to access the authorization server and Authorization Services.
Here we show example configuration added as additional properties to
server.properties
, but you can also define them as environment variables using capitalized or upper-case naming conventions.
strimzi.authorization.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token" 1 strimzi.authorization.client.id="kafka" 2
kafka
is used as the ID.
(Optional) Add configuration for specific Kafka clusters.
For example:
strimzi.authorization.kafka.cluster.name="kafka-cluster" 1
kafka-cluster
.
(Optional) Delegate to simple authorization.
For example:
strimzi.authorization.delegate.to.kafka.acl="false" 1
AclAuthorizer
if access is denied by Red Hat Single Sign-On Authorization Services policies. The default is
false
.
(Optional) Add configuration for TLS connection to the authorization server.
For example:
strimzi.authorization.ssl.truststore.location=<path-to-truststore> 1 strimzi.authorization.ssl.truststore.password=<my-truststore-password> 2 strimzi.authorization.ssl.truststore.type=JKS 3 strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG 4 strimzi.authorization.ssl.endpoint.identification.algorithm=HTTPS 5
HTTPS
, which enforces hostname verification for server certificates.
(Optional) Configure the refresh of grants from the authorization server. The grants refresh job works by enumerating the active tokens and requesting the latest grants for each.
For example:
strimzi.authorization.grants.refresh.period.seconds="120" 1 strimzi.authorization.grants.refresh.pool.size="10" 2
"0"
.
Specifies the size of the thread pool (the degree of parallelism) used by the grants refresh job. The default value is
"5"
.
Verify the configured permissions by accessing Kafka brokers as clients or users with specific roles, making sure they have the necessary access, or do not have the access they are not supposed to have.
Open Policy Agent (OPA) is an open-source policy engine. You can integrate OPA with AMQ Streams to act as a policy-based authorization mechanism for permitting client operations on Kafka brokers. When a request is made from a client, OPA will evaluate the request against policies defined for Kafka access, then allow or deny the request. Red Hat does not support the OPA server.
Additional resources
Before integrating OPA with AMQ Streams, consider how you will define policies to provide fine-grained access controls.
You can define access control for Kafka clusters, consumer groups and topics. For instance, you can define an authorization policy that allows write access from a producer client to a specific broker topic.
For this, the policy might specify the:
User principal
and
host address
associated with the producer client
Operations
allowed for the client
Resource type
(
topic
) and
resource name
the policy applies to
Allow and deny decisions are written into the policy, and a response is provided based on the request and client identification data provided.
In our example the producer client would have to satisfy the policy to be allowed to write to the topic.
To enable Kafka to access the OPA policy engine to query access control policies, , you configure a custom OPA authorizer plugin (
kafka-authorizer-opa-
VERSION
.jar
) in your Kafka
server.properties
file.
When a request is made by a client, the OPA policy engine is queried by the plugin using a specified URL address and a REST endpoint, which must be the name of the defined policy.
The plugin provides the details of the client request — user principal, operation, and resource — in JSON format to be checked against the policy. The details will include the unique identity of the client; for example, taking the distinguished name from the client certificate if TLS authentication is used.
OPA uses the data to provide a response — either
true
or
false
— to the plugin to allow or deny the request.
This procedure describes how to configure Kafka brokers to use OPA authorization.
Before you begin
Consider the access you require or want to limit for certain users. You can use a combination of users and Kafka resources to define OPA policies. It is possible to set up OPA to load user information from an LDAP data source. Super users always have unconstrained access to a Kafka broker regardless of the authorization implemented on the Kafka broker.
Prerequisites
Procedure
Write the OPA policies required for authorizing client requests to perform operations on the Kafka brokers.
See
Defining OPA policies
.
Now configure the Kafka brokers to use OPA.
Install the
OPA authorizer plugin for Kafka
.
See
Connecting to the OPA
.
Make sure that the plugin files are included in the Kafka classpath.
Add the following to the Kafka
server.properties
configuration file to enable the OPA plugin:
authorizer.class.name: com.bisnode.kafka.authorization.OpaAuthorizer
Add further configuration to
server.properties
for the Kafka brokers to access the OPA policy engine and policies.
For example:
opa.authorizer.url=https://OPA-ADDRESS/allow 1 opa.authorizer.allow.on.error=false 2 opa.authorizer.cache.initial.capacity=50000 3 opa.authorizer.cache.maximum.size=50000 4 opa.authorizer.cache.expire.after.seconds=600000 5 super.users=User:alice;User:bob 6
allow
.
Flag to specify whether a client is allowed or denied access by default if the authorizer plugin fails to connect with the OPA policy engine.
Initial capacity in bytes of the local cache. The cache is used so that the plugin does not have to query the OPA policy engine for every request.
Maximum capacity in bytes of the local cache.
Time in milliseconds that the local cache is refreshed by reloading from the OPA policy engine.
A list of user principals treated as super users, so that they are always allowed without querying the Open Policy Agent policy.
Refer to the
Open Policy Agent website
for information on authentication and authorization options.
Verify the configured permissions by accessing Kafka brokers using clients that have and do not have the correct authorization.
Kafka brokers use Log4j as their logging infrastructure. By default, the logging configuration is read from the
log4j.properties
configuration file, which should be placed either in the
/opt/kafka/config/
directory or on the classpath. The location and name of the configuration file can be changed using the Java property
log4j.configuration
, which can be passed to Kafka by using the
KAFKA_LOG4J_OPTS
environment variable:
su - kafka export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/my/path/to/log4j.config"; /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
For more information about Log4j configurations, see the Log4j manual .
Kafka broker logging is provided by multiple
broker loggers
in each broker. You can dynamically change the logging level for broker loggers without having to restart the broker. Increasing the level of detail returned in logs—by changing from
INFO
to
DEBUG
, for example—is useful for investigating performance issues in a Kafka cluster.
Broker loggers can also be dynamically reset to their default logging levels.
Procedure
Switch to the
kafka
user:
su - kafka
List all the broker loggers for a broker by using the
kafka-configs.sh
tool:
/opt/kafka/bin/kafka-configs.sh --bootstrap-server BOOTSTRAP-ADDRESS --describe --entity-type broker-loggers --entity-name BROKER-ID
For example, for broker
0
:
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type broker-loggers --entity-name 0
This returns the logging level for each logger:
TRACE
,
DEBUG
,
INFO
,
WARN
,
ERROR
, or
FATAL
. For example:
kafka.controller.ControllerChannelManager=INFO sensitive=false synonyms={}
kafka.log.TimeIndex=INFO sensitive=false synonyms={}
Change the logging level for one or more broker loggers. Use the
--alter
and
--add-config
options and specify each logger and its level as a comma-separated list in double quotes.
/opt/kafka/bin/kafka-configs.sh --bootstrap-server BOOTSTRAP-ADDRESS --alter --add-config "LOGGER-ONE=NEW-LEVEL,LOGGER-TWO=NEW-LEVEL" --entity-type broker-loggers --entity-name BROKER-ID
For example, for broker
0
:
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config "kafka.controller.ControllerChannelManager=WARN,kafka.log.TimeIndex=WARN" --entity-type broker-loggers --entity-name 0
If successful this returns:
Completed updating config for broker: 0.
You can reset one or more broker loggers to their default logging levels by using the
kafka-configs.sh
tool. Use the
--alter
and
--delete-config
options and specify each broker logger as a comma-separated list in double quotes:
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config "LOGGER-ONE,LOGGER-TWO" --entity-type broker-loggers --entity-name BROKER-ID
Additional resources
八块腹肌的风衣 · Error starting ZK when running the command "gadmin start infra" - GraphStudio - TigerGraph 昨天 |
玉树临风的苦瓜 · Bigwhite's Blog 2 天前 |
威武的帽子 · Kafka 中文文档 - ApacheCN 5 天前 |
乖乖的蜡烛 · View Source 6 天前 |