使用相同的 Microsoft Entra 域服务域和相同的 VNet 创建安全的 Kafka 群集和安全的 Spark 群集。 如果你不想在同一 VNet 中创建这两个群集,可以在两个不同的 VNet 中创建这些群集并将 VNet 对等互连。 如果你不想在同一 VNet 中创建这两个群集。
如果群集位于不同的 VNet 中,请参阅
通过 Azure 门户使用虚拟网络对等互连连接虚拟网络
为两个用户创建密钥表。 例如,
alicetest
和
bobadmin
。
什么是密钥表?
密钥表是包含 Kerberos 主体和加密密钥(从 Kerberos 密码派生)对的文件。 可以使用密钥表文件通过 Kerberos 对各种远程系统进行身份验证,而无需输入密码。
有关此主题的详细信息,请参阅
KTUTIL
创建 Kerberos 主体和密钥表文件
ktutil
ktutil: addent -password -p [email protected] -k 1 -e RC4-HMAC
Password for [email protected]:
ktutil: wkt user1.keytab
ktutil: q
创建从 Kafka 主题读取数据的 Spark 流式处理 Java 应用程序。 本文档使用 DirectKafkaWorkCount 示例,该示例基于 https://github.com/apache/spark/blob/branch-2.3/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java 中的 Spark 流式处理示例
方案高级演练
在 Kafka 群集上进行以下设置:
创建主题 alicetopic2
、bobtopic2
向主题 alicetopic2
、bobtopic2
生成数据
设置 Ranger 策略以允许 alicetest
用户从 alicetopic*
读取数据
设置 Ranger 策略以允许 bobadmin
用户从 *
读取数据
在 Spark 群集上执行的方案
以 alicetest
用户身份使用 alicetopic2
中的数据。 Spark 作业将成功运行,YARN UI 中应会输出主题中的单词计数。 Kafka 群集中的 Ranger 审核记录将显示允许访问。
以 alicetest
用户身份使用 bobtopic2
中的数据。 Spark 作业将会失败并出现错误 org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [bobtopic2]
。 Kafka 群集中的 Ranger 审核记录将显示拒绝访问。
以 bobadmin
用户身份使用 alicetopic2
中的数据。 Spark 作业将成功运行,YARN UI 中应会输出主题中的单词计数。 Kafka 群集中的 Ranger 审核记录将显示允许访问。
以 bobadmin
用户身份使用 bobtopic2
中的数据。 Spark 作业将成功运行,YARN UI 中应会输出主题中的单词计数。 Kafka 群集中的 Ranger 审核记录将显示允许访问。
在 Kafka 群集中,设置 Ranger 策略并根据本部分所述从 Kafka 群集生成数据
转到Kafka 群集上的 Ranger UI 并设置两个 Ranger 策略
为 alicetest
添加 Ranger 策略,该策略通过通配符模式 alicetopic*
使用主题访问权限
为 bobadmin
添加 Ranger 策略,该策略通过通配符模式 *
对所有主题进行各种访问
根据参数值执行以下命令
sshuser@hn0-umasec:~$ sudo apt -y install jq
sshuser@hn0-umasec:~$ export clusterName='YOUR_CLUSTER_NAME'
sshuser@hn0-umasec:~$ export TOPICNAME='YOUR_TOPIC_NAME'
sshuser@hn0-umasec:~$ export password='YOUR_SSH_PASSWORD'
sshuser@hn0-umasec:~$ export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
sshuser@hn0-umasec:~$ export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
sshuser@hn0-umasec:~$ echo $KAFKABROKERS
wn0-umasec.securehadooprc.onmicrosoft.com:9092,
wn1-umasec.securehadooprc.onmicrosoft.com:9092
使用 ktutil
工具为用户 bobadmin
创建密钥表。
将此文件命名为 bobadmin.keytab
sshuser@hn0-umasec:~$ ktutil
ktutil: addent -password -p [email protected] -k 1 -e RC4-HMAC
Password for <username>@<DOMAIN.COM>
ktutil: wkt bobadmin.keytab
ktutil: q
Kinit the created keytab
sudo kinit [email protected] -t bobadmin.keytab
创建 bobadmin_jaas.config
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="./bobadmin.keytab"
useTicketCache=false
serviceName="kafka"
principal="[email protected]";
以 bobadmin
身份创建主题 alicetopic2
和 bobtopic2
sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create alicetopic2 $KAFKABROKERS
sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create bobtopic2 $KAFKABROKERS
以 bobadmin
身份向 alicetopic2
生成数据
sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer alicetopic2 $KAFKABROKERS
以 bobadmin
身份向 bobtopic2
生成数据
sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer bobadmin2 $KAFKABROKERS
在 Spark 群集中,在 Spark 工作器节点中的 /etc/hosts
内添加条目,为 Kafka 工作器节点创建密钥表、jaas_config 文件,并执行 spark-submit 以提交 Spark 作业来从 Kafka 主题读取数据:
使用 sshuser 凭据通过 SSH 连接到 Spark 群集
为 Spark 群集的 /etc/hosts
中的 Kafka 工作器节点创建条目。
为每个 Spark 节点(头节点 + 工作器节点)中的这些 Kafka 工作器节点创建条目。 可以从 Kafka 头节点的 /etc/hosts 中的 Kafka 群集获取这些详细信息。
10.3.16.118 wn0-umasec.securehadooprc.onmicrosoft.com wn0-umasec wn0-umasec.securehadooprc.onmicrosoft.com. wn0-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.cloudapp.net
10.3.16.145 wn1-umasec.securehadooprc.onmicrosoft.com wn1-umasec wn1-umasec.securehadooprc.onmicrosoft.com. wn1-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.cloudapp.net
10.3.16.176 wn2-umasec.securehadooprc.onmicrosoft.com wn2-umasec wn2-umasec.securehadooprc.onmicrosoft.com. wn2-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.cloudapp.net
使用 ktutil 工具为用户 bobadmin
创建密钥表。 将此文件命名为 bobadmin.keytab
使用 ktutil 工具为用户 alicetest
创建密钥表。 将此文件命名为 alicetest.keytab
如以下示例所示创建一个 bobadmin_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="./bobadmin.keytab"
useTicketCache=false
serviceName="kafka"
principal="[email protected]";
如以下示例所示创建一个 alicetest_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="./alicetest.keytab"
useTicketCache=false
serviceName="kafka"
principal="[email protected]";
准备好 spark-streaming jar。
按照此处的 DirectKafkaWorkCount
示例和说明生成你自己的 jar,用于从 Kafka 主题读取数据
为方便起见,此示例中使用的示例 jar 是按照以下步骤从 https://github.com/markgrover/spark-secure-kafka-app 生成的。
sudo apt install maven
git clone https://github.com/markgrover/spark-secure-kafka-app.git
cd spark-secure-kafka-app
mvn clean package
cd target
在 Spark 群集中,允许以用户 alicetest
身份从 Kafka 主题 alicetopic2
读取数据
运行 kdestroy
命令,以通过发出以下命令删除凭据缓存中的 Kerberos 票证
sshuser@hn0-umaspa:~$ kdestroy
使用 alicetest
运行命令 kinit
sshuser@hn0-umaspa:~$ kinit [email protected] -t alicetest.keytab
以 alicetest
身份运行 spark-submit
命令以从 Kafka 主题 alicetopic2
读取数据
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 alicetopic2 false
如果看到以下错误,则表示 DNS(域名服务器)有问题。 请务必检查 Spark 群集的 /etc/hosts
文件中的 Kafka 工作器节点条目。
Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7))
at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:770)
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
在 YARN UI 中访问 YARN 作业输出,可以看到 alicetest
用户能够从 alicetopic2
读取数据。 可以在输出中看到单词计数。
下面是有关如何在 YARN UI 中检查应用程序输出的详细步骤。
转到 YARN UI 并打开你的应用程序。 等待作业进入“正在运行”状态。 你将看到如下所示的应用程序详细信息。
单击“日志”。 你将看到如下所示的日志列表。
单击“stdout”。 你将看到包含 Kafka 主题中单词计数的输出。
在 Kafka 群集的 Ranger UI 中,将显示同一作业的审核日志。
在 Spark 群集中,拒绝以用户 alicetest
身份读取 Kafka 主题 bobtopic2
运行 kdestroy
命令,以通过发出以下命令删除凭据缓存中的 Kerberos 票证
sshuser@hn0-umaspa:~$ kdestroy
使用 alicetest
运行命令 kinit
sshuser@hn0-umaspa:~$ kinit [email protected] -t alicetest.keytab
以 alicetest
身份运行 spark-submit 命令以从 kafka 主题 bobtopic2
读取数据
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicestest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 bobtopic2 false
在 Yarn UI 中访问 Yarn 作业输出,可以看到 alicetest
用户无法从 bobtopic2
读取数据,并且作业失败。
在 Kafka 群集的 Ranger UI 中,将显示同一作业的审核日志。
在 Spark 群集中,允许以用户 bobadmin
身份从 Kafka 主题 alicetopic2
读取数据
运行 kdestroy
命令以删除凭据缓存中的 Kerberos 票证
sshuser@hn0-umaspa:~$ kdestroy
使用 bobadmin
运行 kinit
命令
sshuser@hn0-umaspa:~$ kinit [email protected] -t bobadmin.keytab
以 bobadmin
身份运行 spark-submit
命令以从 Kafka 主题 alicetopic2
读取数据
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 alicetopic2 false
在 YARN UI 中访问 Yarn 作业输出,可以看到 bobadmin
用户能够从 alicetopic2
读取数据,并且输出中显示了单词计数。
在 Kafka 群集的 Ranger UI 中,将显示同一作业的审核日志。
在 Spark 群集中,允许以用户 bobadmin
身份从 Kafka 主题 bobtopic2
读取数据。
运行以下命令删除凭据缓存中的 Kerberos 票证
sshuser@hn0-umaspa:~$ kdestroy
使用 bobadmin
运行 kinit
sshuser@hn0-umaspa:~$ kinit [email protected] -t bobadmin.keytab
以 bobadmin
身份运行 spark-submit
命令以从 Kafka 主题 bobtopic2
读取数据
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 bobtopic2 false
在 YARN UI 中访问 YARN 作业输出,可以看到 bobtest
用户能够从 bobtopic2
读取数据,并且输出中显示了单词计数。
在 Kafka 群集的 Ranger UI 中,将显示同一作业的审核日志。
为 Azure HDInsight 中的 Apache Kafka 设置 TLS 加密和身份验证