保护 Spark 和 Kafka - Spark 流式处理集成方案

本文档介绍在虚拟网络相同/已建立对等互连的情况下,如何在安全 Spark 群集中执行 Spark 作业以便从安全 Kafka 群集中的主题读取数据。

先决条件

  • 使用相同的 Microsoft Entra 域服务域和相同的 VNet 创建安全的 Kafka 群集和安全的 Spark 群集。 如果你不想在同一 VNet 中创建这两个群集,可以在两个不同的 VNet 中创建这些群集并将 VNet 对等互连。 如果你不想在同一 VNet 中创建这两个群集。
  • 如果群集位于不同的 VNet 中,请参阅通过 Azure 门户使用虚拟网络对等互连连接虚拟网络
  • 为两个用户创建密钥表。 例如,alicetestbobadmin

什么是密钥表?

密钥表是包含 Kerberos 主体和加密密钥(从 Kerberos 密码派生)对的文件。 可以使用密钥表文件通过 Kerberos 对各种远程系统进行身份验证,而无需输入密码。

有关此主题的详细信息,请参阅

  1. KTUTIL

  2. 创建 Kerberos 主体和密钥表文件

ktutil
ktutil: addent -password -p user1@TEST.COM -k 1 -e RC4-HMAC
Password for user1@TEST.COM:
ktutil: wkt user1.keytab
ktutil: q
  1. 创建从 Kafka 主题读取数据的 Spark 流式处理 Java 应用程序。 本文档使用 DirectKafkaWorkCount 示例,该示例基于 https://github.com/apache/spark/blob/branch-2.3/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java 中的 Spark 流式处理示例

方案高级演练

在 Kafka 群集上进行以下设置:

  1. 创建主题 alicetopic2bobtopic2
  2. 向主题 alicetopic2bobtopic2 生成数据
  3. 设置 Ranger 策略以允许 alicetest 用户从 alicetopic* 读取数据
  4. 设置 Ranger 策略以允许 bobadmin 用户从 * 读取数据

在 Spark 群集上执行的方案

  1. alicetest 用户身份使用 alicetopic2 中的数据。 Spark 作业将成功运行,YARN UI 中应会输出主题中的单词计数。 Kafka 群集中的 Ranger 审核记录将显示允许访问。
  2. alicetest 用户身份使用 bobtopic2 中的数据。 Spark 作业将会失败并出现错误 org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [bobtopic2]。 Kafka 群集中的 Ranger 审核记录将显示拒绝访问。
  3. bobadmin 用户身份使用 alicetopic2 中的数据。 Spark 作业将成功运行,YARN UI 中应会输出主题中的单词计数。 Kafka 群集中的 Ranger 审核记录将显示允许访问。
  4. bobadmin 用户身份使用 bobtopic2 中的数据。 Spark 作业将成功运行,YARN UI 中应会输出主题中的单词计数。 Kafka 群集中的 Ranger 审核记录将显示允许访问。

在 Kafka 群集上执行的步骤

在 Kafka 群集中,设置 Ranger 策略并根据本部分所述从 Kafka 群集生成数据

  1. 转到Kafka 群集上的 Ranger UI 并设置两个 Ranger 策略

  2. alicetest 添加 Ranger 策略,该策略通过通配符模式 alicetopic* 使用主题访问权限

  3. bobadmin 添加 Ranger 策略,该策略通过通配符模式 * 对所有主题进行各种访问

  4. 根据参数值执行以下命令

    sshuser@hn0-umasec:~$ sudo apt -y install jq 
    sshuser@hn0-umasec:~$ export clusterName='YOUR_CLUSTER_NAME'
    sshuser@hn0-umasec:~$ export TOPICNAME='YOUR_TOPIC_NAME'
    sshuser@hn0-umasec:~$ export password='YOUR_SSH_PASSWORD'
    sshuser@hn0-umasec:~$ export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    sshuser@hn0-umasec:~$ export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    sshuser@hn0-umasec:~$ echo $KAFKABROKERS
    wn0-umasec.securehadooprc.partner.onmschina.cn:9092,
    wn1-umasec.securehadooprc.partner.onmschina.cn:9092
    
  5. 使用 ktutil 工具为用户 bobadmin 创建密钥表。

  6. 将此文件命名为 bobadmin.keytab

    sshuser@hn0-umasec:~$ ktutil
    ktutil: addent -password -p bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -k 1 -e RC4-HMAC 
    Password for <username>@<DOMAIN.COM>
    ktutil: wkt bobadmin.keytab 
    ktutil: q
    Kinit the created keytab
    sudo kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytab
    
  7. 创建 bobadmin_jaas.config

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="./bobadmin.keytab"
      useTicketCache=false
      serviceName="kafka"
      principal="bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM";
    };
    
  8. bobadmin 身份创建主题 alicetopic2bobtopic2

    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create alicetopic2 $KAFKABROKERS
    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create bobtopic2 $KAFKABROKERS
    
  9. bobadmin 身份向 alicetopic2 生成数据

    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer alicetopic2 $KAFKABROKERS
    
  10. bobadmin 身份向 bobtopic2 生成数据

    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer bobadmin2 $KAFKABROKERS
    

在 Spark 群集上执行的步骤

在 Spark 群集中,在 Spark 工作器节点中的 /etc/hosts 内添加条目,为 Kafka 工作器节点创建密钥表、jaas_config 文件,并执行 spark-submit 以提交 Spark 作业来从 Kafka 主题读取数据:

  1. 使用 sshuser 凭据通过 SSH 连接到 Spark 群集

  2. 为 Spark 群集的 /etc/hosts 中的 Kafka 工作器节点创建条目。

    备注

    为每个 Spark 节点(头节点 + 工作器节点)中的这些 Kafka 工作器节点创建条目。 可以从 Kafka 头节点的 /etc/hosts 中的 Kafka 群集获取这些详细信息。

    10.3.16.118 wn0-umasec.securehadooprc.partner.onmschina.cn wn0-umasec wn0-umasec.securehadooprc.partner.onmschina.cn. wn0-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.chinacloudapp.cn
    
    10.3.16.145 wn1-umasec.securehadooprc.partner.onmschina.cn wn1-umasec wn1-umasec.securehadooprc.partner.onmschina.cn. wn1-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.chinacloudapp.cn
    
    10.3.16.176 wn2-umasec.securehadooprc.partner.onmschina.cn wn2-umasec wn2-umasec.securehadooprc.partner.onmschina.cn. wn2-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.chinacloudapp.cn
    
  3. 使用 ktutil 工具为用户 bobadmin 创建密钥表。 将此文件命名为 bobadmin.keytab

  4. 使用 ktutil 工具为用户 alicetest 创建密钥表。 将此文件命名为 alicetest.keytab

  5. 如以下示例所示创建一个 bobadmin_jaas.conf

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="./bobadmin.keytab"
      useTicketCache=false
      serviceName="kafka"
      principal="bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM";
    };
    
  6. 如以下示例所示创建一个 alicetest_jaas.conf

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="./alicetest.keytab"
      useTicketCache=false
      serviceName="kafka"
      principal="alicetest@SECUREHADOOPRC.ONMICROSOFT.COM";
    };
    
  7. 准备好 spark-streaming jar。

  8. 按照此处DirectKafkaWorkCount 示例和说明生成你自己的 jar,用于从 Kafka 主题读取数据

备注

为方便起见,此示例中使用的示例 jar 是按照以下步骤从 https://github.com/markgrover/spark-secure-kafka-app 生成的。

sudo apt install maven
git clone https://github.com/markgrover/spark-secure-kafka-app.git
cd spark-secure-kafka-app
mvn clean package
cd target

方案 1

在 Spark 群集中,允许以用户 alicetest 身份从 Kafka 主题 alicetopic2 读取数据

  1. 运行 kdestroy 命令,以通过发出以下命令删除凭据缓存中的 Kerberos 票证

    sshuser@hn0-umaspa:~$ kdestroy
    
  2. 使用 alicetest 运行命令 kinit

    sshuser@hn0-umaspa:~$ kinit alicetest@SECUREHADOOPRC.ONMICROSOFT.COM -t alicetest.keytab
    
  3. alicetest 身份运行 spark-submit 命令以从 Kafka 主题 alicetopic2 读取数据

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    

    例如,

    sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 alicetopic2 false
    

    如果看到以下错误,则表示 DNS(域名服务器)有问题。 请务必检查 Spark 群集的 /etc/hosts 文件中的 Kafka 工作器节点条目。

    Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7))
            at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:770)
            at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
            at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
            at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
    
  4. 在 YARN UI 中访问 YARN 作业输出,可以看到 alicetest 用户能够从 alicetopic2 读取数据。 可以在输出中看到单词计数。

  5. 下面是有关如何在 YARN UI 中检查应用程序输出的详细步骤。

    1. 转到 YARN UI 并打开你的应用程序。 等待作业进入“正在运行”状态。 你会看到以下应用程序详细信息。

    2. 单击“日志”。 你会看到以下日志列表。

    3. 单击“stdout”。 你将看到包含 Kafka 主题中单词计数的以下输出。

    4. 在 Kafka 群集的 Ranger UI 中,将显示相同的审核日志。

方案 2

在 Spark 群集中,拒绝以用户 alicetest 身份读取 Kafka 主题 bobtopic2

  1. 运行 kdestroy 命令,以通过发出以下命令删除凭据缓存中的 Kerberos 票证

    sshuser@hn0-umaspa:~$ kdestroy
    
  2. 使用 alicetest 运行命令 kinit

    sshuser@hn0-umaspa:~$ kinit alicetest@SECUREHADOOPRC.ONMICROSOFT.COM -t alicetest.keytab
    
  3. alicetest 身份运行 spark-submit 命令以从 kafka 主题 bobtopic2 读取数据

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    

    例如,

    sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicestest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 bobtopic2 false
    
  4. 在 Yarn UI 中访问 Yarn 作业输出,可以看到 alicetest 用户无法从 bobtopic2 读取数据,并且作业失败。

  5. 在 Kafka 群集的 Ranger UI 中,将显示相同的审核日志。

方案 3

在 Spark 群集中,允许以用户 bobadmin 身份从 Kafka 主题 alicetopic2 读取数据

  1. 运行 kdestroy 命令以删除凭据缓存中的 Kerberos 票证

    sshuser@hn0-umaspa:~$ kdestroy
    
  2. 使用 bobadmin 运行 kinit 命令

    sshuser@hn0-umaspa:~$ kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytab
    
  3. bobadmin 身份运行 spark-submit 命令以从 Kafka 主题 alicetopic2 读取数据

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    

    例如,

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 alicetopic2 false
    
  4. 在 YARN UI 中访问 Yarn 作业输出,可以看到 bobadmin 用户能够从 alicetopic2 读取数据,并且输出中显示了单词计数。

  5. 在 Kafka 群集的 Ranger UI 中,将显示相同的审核日志。

方案 4

在 Spark 群集中,允许以用户 bobadmin 身份从 Kafka 主题 bobtopic2 读取数据。

  1. 运行以下命令删除凭据缓存中的 Kerberos 票证

    sshuser@hn0-umaspa:~$ kdestroy
    
  2. 使用 bobadmin 运行 kinit

    sshuser@hn0-umaspa:~$ kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytab
    
  3. bobadmin 身份运行 spark-submit 命令以从 Kafka 主题 bobtopic2 读取数据

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    

    例如,

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 bobtopic2 false
    
  4. 在 YARN UI 中访问 YARN 作业输出,可以看到 bobtest 用户能够从 bobtopic2 读取数据,并且输出中显示了单词计数。

  5. 在 Kafka 群集的 Ranger UI 中,将显示相同的审核日志。

后续步骤