添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
  • 使用相同的 Microsoft Entra 域服务域和相同的 VNet 创建安全的 Kafka 群集和安全的 Spark 群集。 如果你不想在同一 VNet 中创建这两个群集,可以在两个不同的 VNet 中创建这些群集并将 VNet 对等互连。 如果你不想在同一 VNet 中创建这两个群集。
  • 如果群集位于不同的 VNet 中,请参阅 通过 Azure 门户使用虚拟网络对等互连连接虚拟网络
  • 为两个用户创建密钥表。 例如, alicetest bobadmin
  • 什么是密钥表?

    密钥表是包含 Kerberos 主体和加密密钥(从 Kerberos 密码派生)对的文件。 可以使用密钥表文件通过 Kerberos 对各种远程系统进行身份验证,而无需输入密码。

    有关此主题的详细信息,请参阅

  • KTUTIL

  • 创建 Kerberos 主体和密钥表文件

    ktutil
    ktutil: addent -password -p [email protected] -k 1 -e RC4-HMAC
    Password for [email protected]:
    ktutil: wkt user1.keytab
    ktutil: q
    
  • 创建从 Kafka 主题读取数据的 Spark 流式处理 Java 应用程序。 本文档使用 DirectKafkaWorkCount 示例,该示例基于 https://github.com/apache/spark/blob/branch-2.3/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java 中的 Spark 流式处理示例
  • 方案高级演练

    在 Kafka 群集上进行以下设置:

  • 创建主题 alicetopic2bobtopic2
  • 向主题 alicetopic2bobtopic2 生成数据
  • 设置 Ranger 策略以允许 alicetest 用户从 alicetopic* 读取数据
  • 设置 Ranger 策略以允许 bobadmin 用户从 * 读取数据
  • 在 Spark 群集上执行的方案

  • alicetest 用户身份使用 alicetopic2 中的数据。 Spark 作业将成功运行,YARN UI 中应会输出主题中的单词计数。 Kafka 群集中的 Ranger 审核记录将显示允许访问。
  • alicetest 用户身份使用 bobtopic2 中的数据。 Spark 作业将会失败并出现错误 org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [bobtopic2]。 Kafka 群集中的 Ranger 审核记录将显示拒绝访问。
  • bobadmin 用户身份使用 alicetopic2 中的数据。 Spark 作业将成功运行,YARN UI 中应会输出主题中的单词计数。 Kafka 群集中的 Ranger 审核记录将显示允许访问。
  • bobadmin 用户身份使用 bobtopic2 中的数据。 Spark 作业将成功运行,YARN UI 中应会输出主题中的单词计数。 Kafka 群集中的 Ranger 审核记录将显示允许访问。
  • 在 Kafka 群集上执行的步骤

    在 Kafka 群集中,设置 Ranger 策略并根据本部分所述从 Kafka 群集生成数据

  • 转到Kafka 群集上的 Ranger UI 并设置两个 Ranger 策略

  • alicetest 添加 Ranger 策略,该策略通过通配符模式 alicetopic* 使用主题访问权限

  • bobadmin 添加 Ranger 策略,该策略通过通配符模式 * 对所有主题进行各种访问

  • 根据参数值执行以下命令

    sshuser@hn0-umasec:~$ sudo apt -y install jq 
    sshuser@hn0-umasec:~$ export clusterName='YOUR_CLUSTER_NAME'
    sshuser@hn0-umasec:~$ export TOPICNAME='YOUR_TOPIC_NAME'
    sshuser@hn0-umasec:~$ export password='YOUR_SSH_PASSWORD'
    sshuser@hn0-umasec:~$ export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    sshuser@hn0-umasec:~$ export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    sshuser@hn0-umasec:~$ echo $KAFKABROKERS
    wn0-umasec.securehadooprc.onmicrosoft.com:9092,
    wn1-umasec.securehadooprc.onmicrosoft.com:9092
    
  • 使用 ktutil 工具为用户 bobadmin 创建密钥表。

  • 将此文件命名为 bobadmin.keytab

    sshuser@hn0-umasec:~$ ktutil
    ktutil: addent -password -p [email protected] -k 1 -e RC4-HMAC 
    Password for <username>@<DOMAIN.COM>
    ktutil: wkt bobadmin.keytab 
    ktutil: q
    Kinit the created keytab
    sudo kinit [email protected] -t bobadmin.keytab
    
  • 创建 bobadmin_jaas.config

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="./bobadmin.keytab"
      useTicketCache=false
      serviceName="kafka"
      principal="[email protected]";
    
  • bobadmin 身份创建主题 alicetopic2bobtopic2

    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create alicetopic2 $KAFKABROKERS
    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create bobtopic2 $KAFKABROKERS
    
  • bobadmin 身份向 alicetopic2 生成数据

    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer alicetopic2 $KAFKABROKERS
    
  • bobadmin 身份向 bobtopic2 生成数据

    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer bobadmin2 $KAFKABROKERS
    

    在 Spark 群集上执行的步骤

    在 Spark 群集中,在 Spark 工作器节点中的 /etc/hosts 内添加条目,为 Kafka 工作器节点创建密钥表、jaas_config 文件,并执行 spark-submit 以提交 Spark 作业来从 Kafka 主题读取数据:

  • 使用 sshuser 凭据通过 SSH 连接到 Spark 群集

  • 为 Spark 群集的 /etc/hosts 中的 Kafka 工作器节点创建条目。

    为每个 Spark 节点(头节点 + 工作器节点)中的这些 Kafka 工作器节点创建条目。 可以从 Kafka 头节点的 /etc/hosts 中的 Kafka 群集获取这些详细信息。

    10.3.16.118 wn0-umasec.securehadooprc.onmicrosoft.com wn0-umasec wn0-umasec.securehadooprc.onmicrosoft.com. wn0-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.cloudapp.net
    10.3.16.145 wn1-umasec.securehadooprc.onmicrosoft.com wn1-umasec wn1-umasec.securehadooprc.onmicrosoft.com. wn1-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.cloudapp.net
    10.3.16.176 wn2-umasec.securehadooprc.onmicrosoft.com wn2-umasec wn2-umasec.securehadooprc.onmicrosoft.com. wn2-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.cloudapp.net
    
  • 使用 ktutil 工具为用户 bobadmin 创建密钥表。 将此文件命名为 bobadmin.keytab

  • 使用 ktutil 工具为用户 alicetest 创建密钥表。 将此文件命名为 alicetest.keytab

  • 如以下示例所示创建一个 bobadmin_jaas.conf

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="./bobadmin.keytab"
      useTicketCache=false
      serviceName="kafka"
      principal="[email protected]";
    
  • 如以下示例所示创建一个 alicetest_jaas.conf

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="./alicetest.keytab"
      useTicketCache=false
      serviceName="kafka"
      principal="[email protected]";
    
  • 准备好 spark-streaming jar。

  • 按照此处DirectKafkaWorkCount 示例和说明生成你自己的 jar,用于从 Kafka 主题读取数据

    为方便起见,此示例中使用的示例 jar 是按照以下步骤从 https://github.com/markgrover/spark-secure-kafka-app 生成的。

    sudo apt install maven
    git clone https://github.com/markgrover/spark-secure-kafka-app.git
    cd spark-secure-kafka-app
    mvn clean package
    cd target
    

    在 Spark 群集中,允许以用户 alicetest 身份从 Kafka 主题 alicetopic2 读取数据

  • 运行 kdestroy 命令,以通过发出以下命令删除凭据缓存中的 Kerberos 票证

    sshuser@hn0-umaspa:~$ kdestroy
    
  • 使用 alicetest 运行命令 kinit

    sshuser@hn0-umaspa:~$ kinit [email protected] -t alicetest.keytab
    
  • alicetest 身份运行 spark-submit 命令以从 Kafka 主题 alicetopic2 读取数据

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    
    sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 alicetopic2 false
    

    如果看到以下错误,则表示 DNS(域名服务器)有问题。 请务必检查 Spark 群集的 /etc/hosts 文件中的 Kafka 工作器节点条目。

    Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7))
            at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:770)
            at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
            at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
            at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
    
  • 在 YARN UI 中访问 YARN 作业输出,可以看到 alicetest 用户能够从 alicetopic2 读取数据。 可以在输出中看到单词计数。

  • 下面是有关如何在 YARN UI 中检查应用程序输出的详细步骤。

  • 转到 YARN UI 并打开你的应用程序。 等待作业进入“正在运行”状态。 你将看到如下所示的应用程序详细信息。

  • 单击“日志”。 你将看到如下所示的日志列表。

  • 单击“stdout”。 你将看到包含 Kafka 主题中单词计数的输出。

  • 在 Kafka 群集的 Ranger UI 中,将显示同一作业的审核日志。

    在 Spark 群集中,拒绝以用户 alicetest 身份读取 Kafka 主题 bobtopic2

  • 运行 kdestroy 命令,以通过发出以下命令删除凭据缓存中的 Kerberos 票证

    sshuser@hn0-umaspa:~$ kdestroy
    
  • 使用 alicetest 运行命令 kinit

    sshuser@hn0-umaspa:~$ kinit [email protected] -t alicetest.keytab
    
  • alicetest 身份运行 spark-submit 命令以从 kafka 主题 bobtopic2 读取数据

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    
    sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicestest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 bobtopic2 false
    
  • 在 Yarn UI 中访问 Yarn 作业输出,可以看到 alicetest 用户无法从 bobtopic2 读取数据,并且作业失败。

  • 在 Kafka 群集的 Ranger UI 中,将显示同一作业的审核日志。

    在 Spark 群集中,允许以用户 bobadmin 身份从 Kafka 主题 alicetopic2 读取数据

  • 运行 kdestroy 命令以删除凭据缓存中的 Kerberos 票证

    sshuser@hn0-umaspa:~$ kdestroy
    
  • 使用 bobadmin 运行 kinit 命令

    sshuser@hn0-umaspa:~$ kinit [email protected] -t bobadmin.keytab
    
  • bobadmin 身份运行 spark-submit 命令以从 Kafka 主题 alicetopic2 读取数据

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    
    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 alicetopic2 false
    
  • 在 YARN UI 中访问 Yarn 作业输出,可以看到 bobadmin 用户能够从 alicetopic2 读取数据,并且输出中显示了单词计数。

  • 在 Kafka 群集的 Ranger UI 中,将显示同一作业的审核日志。

    在 Spark 群集中,允许以用户 bobadmin 身份从 Kafka 主题 bobtopic2 读取数据。

  • 运行以下命令删除凭据缓存中的 Kerberos 票证

    sshuser@hn0-umaspa:~$ kdestroy
    
  • 使用 bobadmin 运行 kinit

    sshuser@hn0-umaspa:~$ kinit [email protected] -t bobadmin.keytab
    
  • bobadmin 身份运行 spark-submit 命令以从 Kafka 主题 bobtopic2 读取数据

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    
    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 bobtopic2 false
    
  • 在 YARN UI 中访问 YARN 作业输出,可以看到 bobtest 用户能够从 bobtopic2 读取数据,并且输出中显示了单词计数。

  • 在 Kafka 群集的 Ranger UI 中,将显示同一作业的审核日志。

  • 为 Azure HDInsight 中的 Apache Kafka 设置 TLS 加密和身份验证
  •