Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
In this document, you learn how to execute a Spark job in a secure Spark cluster that reads from a topic in secure Kafka cluster, provided the virtual networks are same/peered.
Pre-requisites
- Create a secure Kafka cluster and secure spark cluster with the same Microsoft Entra Domain Services domain and same vnet. If you prefer not to create both clusters in the same vnet, you can create them in two separate vnets and pair the vnets also. If you prefer not to create both clusters in the same vnet.
- If your clusters are in different vnets, see here Connect virtual networks with virtual network peering using the Azure portal
- Create key tabs for two users. For example,
alicetestandbobadmin.
What is a keytab?
A keytab is a file containing pairs of Kerberos principles and encrypted keys (which are derived from the Kerberos password). You can use a keytab file to authenticate to various remote systems using Kerberos without entering a password.
For more information about this topic, see
ktutil
ktutil: addent -password -p user1@TEST.COM -k 1 -e RC4-HMAC
Password for user1@TEST.COM:
ktutil: wkt user1.keytab
ktutil: q
- Create a spark streaming java application that reads from Kafka topics. This document uses a DirectKafkaWorkCount example that was based off spark streaming examples from https://github.com/apache/spark/blob/branch-2.3/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
High level walkthrough of the Scenarios
Set up on Kafka cluster:
- Create topics
alicetopic2,bobtopic2 - Produce data to topics
alicetopic2,bobtopic2 - Set up Ranger policy to allow
alicetestuser to read fromalicetopic* - Set up Ranger policy to allow
bobadminuser to read from*
Scenarios to be executed on Spark cluster
- Consume data from
alicetopic2asalicetestuser. The spark job would run successfully and the count of the words in the topic should be output in the YARN UI. The Ranger audit records in kafka cluster would show that access is allowed. - Consume data from
bobtopic2asalicetestuser. The spark job would fail withorg.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [bobtopic2]. The Ranger audit records in kafka cluster would show that the access is denied. - Consume data from
alicetopic2asbobadminuser. The spark job would run successfully and the count of the words in the topic should be output in the YARN UI. The Ranger audit records in kafka cluster would show that access is allowed. - Consume data from
bobtopic2asbobadminuser. The spark job would run successfully and the count of the words in the topic should be output in the YARN UI. The Ranger audit records in kafka cluster would show that access is allowed.
Steps to be performed on Kafka cluster
In the Kafka cluster, set up Ranger policies and produce data from Kafka cluster that are explained in this section
Go to Ranger UI on kafka cluster and set up two Ranger policies
Add a Ranger policy for
alicetestwith consume access to topics with wildcard patternalicetopic*Add a Ranger policy for
bobadminwith all accesses to all topics with wildcard pattern*Execute the following commands based on your parameter values
sshuser@hn0-umasec:~$ sudo apt -y install jq sshuser@hn0-umasec:~$ export clusterName='YOUR_CLUSTER_NAME' sshuser@hn0-umasec:~$ export TOPICNAME='YOUR_TOPIC_NAME' sshuser@hn0-umasec:~$ export password='YOUR_SSH_PASSWORD' sshuser@hn0-umasec:~$ export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2); sshuser@hn0-umasec:~$ export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); sshuser@hn0-umasec:~$ echo $KAFKABROKERS wn0-umasec.securehadooprc.partner.onmschina.cn:9092, wn1-umasec.securehadooprc.partner.onmschina.cn:9092Create a keytab for user
bobadminusingktutiltool.Let's call this file
bobadmin.keytabsshuser@hn0-umasec:~$ ktutil ktutil: addent -password -p bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -k 1 -e RC4-HMAC Password for <username>@<DOMAIN.COM> ktutil: wkt bobadmin.keytab ktutil: q Kinit the created keytab sudo kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytabCreate a
bobadmin_jaas.configKafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./bobadmin.keytab" useTicketCache=false serviceName="kafka" principal="bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM"; };Create topics
alicetopic2andbobtopic2asbobadminsshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create alicetopic2 $KAFKABROKERS sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create bobtopic2 $KAFKABROKERSProduce data to
alicetopic2asbobadminsshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer alicetopic2 $KAFKABROKERSProduce data to
bobtopic2asbobadminsshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer bobadmin2 $KAFKABROKERS
Set up steps to be performed on Spark cluster
In the Spark cluster, add entries in /etc/hosts in spark worker nodes, for Kafka worker nodes, create keytabs, jaas_config files, and perform a spark-submit to submit a spark job to read from the kafka topic:
ssh into spark cluster with sshuser credentials
Make entries for the kafka worker nodes in
/etc/hostsof the spark cluster.Note
Make the entry of these kafka worker nodes in every spark node (head node + worker node). You can get these details from kafka cluster in /etc/hosts of head node of Kafka.
10.3.16.118 wn0-umasec.securehadooprc.partner.onmschina.cn wn0-umasec wn0-umasec.securehadooprc.partner.onmschina.cn. wn0-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.chinacloudapp.cn 10.3.16.145 wn1-umasec.securehadooprc.partner.onmschina.cn wn1-umasec wn1-umasec.securehadooprc.partner.onmschina.cn. wn1-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.chinacloudapp.cn 10.3.16.176 wn2-umasec.securehadooprc.partner.onmschina.cn wn2-umasec wn2-umasec.securehadooprc.partner.onmschina.cn. wn2-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.chinacloudapp.cnCreate a keytab for user
bobadminusing ktutil tool. Let's call this filebobadmin.keytabCreate a keytab for user
alicetestusing ktutil tool. Let's call this filealicetest.keytabCreate a
bobadmin_jaas.confas shown in following sampleKafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./bobadmin.keytab" useTicketCache=false serviceName="kafka" principal="bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM"; };Create an
alicetest_jaas.confas shown in following sampleKafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./alicetest.keytab" useTicketCache=false serviceName="kafka" principal="alicetest@SECUREHADOOPRC.ONMICROSOFT.COM"; };Get the spark-streaming jar ready.
Build your own jar that reads from a Kafka topic by following the example and instructions here for
DirectKafkaWorkCount
Note
For convenience, this sample jar used in this example was built from https://github.com/markgrover/spark-secure-kafka-app by following these steps.
sudo apt install maven
git clone https://github.com/markgrover/spark-secure-kafka-app.git
cd spark-secure-kafka-app
mvn clean package
cd target
Scenario 1
From Spark cluster, read from kafka topic alicetopic2 as user alicetest is allowed
Run a
kdestroycommand to remove the Kerberos tickets in credential cache by issuing following commandsshuser@hn0-umaspa:~$ kdestroyRun the command
kinitwithalicetestsshuser@hn0-umaspa:~$ kinit alicetest@SECUREHADOOPRC.ONMICROSOFT.COM -t alicetest.keytabRun a
spark-submitcommand to read from kafka topicalicetopic2asalicetestspark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> falseFor example,
sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 alicetopic2 falseIf you see the following error, which denotes the DNS (Domain Name Server) issue. Make sure to check Kafka worker nodes entry in
/etc/hostsfile in Spark cluster.Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7)) at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:770) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179) at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)From YARN UI, access the YARN job output you can see the
alicetestuser is able to read fromalicetopic2. You can see the word count in the output.Following are the detailed steps on how to check the application output from YARN UI.
Go to YARN UI and open your application. Wait for the job to go to RUNNING state. You'll see the following application details.
Click Logs. You'll see the following list of logs.
Click 'stdout'. You'll see the following output with the count of words from your Kafka topic.
On the Kafka cluster's Ranger UI, audit logs for the same will be shown.
Scenario 2
From Spark cluster, read Kafka topic bobtopic2 as user alicetest is denied
Run
kdestroycommand to remove the Kerberos tickets in credential cache by issuing following commandsshuser@hn0-umaspa:~$ kdestroyRun the command
kinitwithalicetestsshuser@hn0-umaspa:~$ kinit alicetest@SECUREHADOOPRC.ONMICROSOFT.COM -t alicetest.keytabRun spark-submit command to read from kafka topic
bobtopic2asalicetestspark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> falseFor example,
sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicestest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 bobtopic2 falseFrom yarn UI, access the yarn job output you can see that
alicetestuser is unable to read frombobtopic2and the job fails.On the Kafka cluster's Ranger UI, audit logs for the same will be shown.
Scenario 3
From Spark cluster, read from kafka topic alicetopic2 as user bobadmin is allowed
Run
kdestroycommand to remove the Kerberos tickets in credential cachesshuser@hn0-umaspa:~$ kdestroyRun
kinitcommand withbobadminsshuser@hn0-umaspa:~$ kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytabRun
spark-submitcommand to read from kafka topicalicetopic2asbobadminspark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> falseFor example,
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 alicetopic2 falseFrom YARN UI, access the yarn job output you can see that
bobadminuser is able to read fromalicetopic2and the count of words is seen in the output.On the Kafka cluster's Ranger UI, audit logs for the same will be shown.
Scenario 4
From Spark cluster, read from Kafka topic bobtopic2 as user bobadmin is allowed.
Remove the Kerberos tickets in Credential Cache by running following command
sshuser@hn0-umaspa:~$ kdestroyRun
kinitwithbobadminsshuser@hn0-umaspa:~$ kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytabRun a
spark-submitcommand to read from Kafka topicbobtopic2asbobadminspark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> falseFor example,
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 bobtopic2 falseFrom YARN UI, access the YARN job output you can see that
bobtestuser is able to read frombobtopic2and the count of words is seen in the output.On the Kafka cluster's Ranger UI, audit logs for the same will be shown.