Secure Spark and Kafka - Spark streaming integration scenario
In this document, you learn how to execute a Spark job in a secure Spark cluster that reads from a topic in secure Kafka cluster, provided the virtual networks are same/peered.
Pre-requisites
- Create a secure Kafka cluster and secure spark cluster with the same Microsoft Entra Domain Services domain and same vnet. If you prefer not to create both clusters in the same vnet, you can create them in two separate vnets and pair the vnets also. If you prefer not to create both clusters in the same vnet.
- If your clusters are in different vnets, see here Connect virtual networks with virtual network peering using the Azure portal
- Create key tabs for two users. For example,
alicetest
andbobadmin
.
A keytab is a file containing pairs of Kerberos principles and encrypted keys (which are derived from the Kerberos password). You can use a keytab file to authenticate to various remote systems using Kerberos without entering a password.
For more information about this topic, see
ktutil
ktutil: addent -password -p user1@TEST.COM -k 1 -e RC4-HMAC
Password for user1@TEST.COM:
ktutil: wkt user1.keytab
ktutil: q
- Create a spark streaming java application that reads from Kafka topics. This document uses a DirectKafkaWorkCount example that was based off spark streaming examples from https://github.com/apache/spark/blob/branch-2.3/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
Set up on Kafka cluster:
- Create topics
alicetopic2
,bobtopic2
- Produce data to topics
alicetopic2
,bobtopic2
- Set up Ranger policy to allow
alicetest
user to read fromalicetopic*
- Set up Ranger policy to allow
bobadmin
user to read from*
- Consume data from
alicetopic2
asalicetest
user. The spark job would run successfully and the count of the words in the topic should be output in the YARN UI. The Ranger audit records in kafka cluster would show that access is allowed. - Consume data from
bobtopic2
asalicetest
user. The spark job would fail withorg.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [bobtopic2]
. The Ranger audit records in kafka cluster would show that the access is denied. - Consume data from
alicetopic2
asbobadmin
user. The spark job would run successfully and the count of the words in the topic should be output in the YARN UI. The Ranger audit records in kafka cluster would show that access is allowed. - Consume data from
bobtopic2
asbobadmin
user. The spark job would run successfully and the count of the words in the topic should be output in the YARN UI. The Ranger audit records in kafka cluster would show that access is allowed.
In the Kafka cluster, set up Ranger policies and produce data from Kafka cluster that are explained in this section
Go to Ranger UI on kafka cluster and set up two Ranger policies
Add a Ranger policy for
alicetest
with consume access to topics with wildcard patternalicetopic*
Add a Ranger policy for
bobadmin
with all accesses to all topics with wildcard pattern*
Execute the following commands based on your parameter values
sshuser@hn0-umasec:~$ sudo apt -y install jq sshuser@hn0-umasec:~$ export clusterName='YOUR_CLUSTER_NAME' sshuser@hn0-umasec:~$ export TOPICNAME='YOUR_TOPIC_NAME' sshuser@hn0-umasec:~$ export password='YOUR_SSH_PASSWORD' sshuser@hn0-umasec:~$ export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2); sshuser@hn0-umasec:~$ export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); sshuser@hn0-umasec:~$ echo $KAFKABROKERS wn0-umasec.securehadooprc.partner.onmschina.cn:9092, wn1-umasec.securehadooprc.partner.onmschina.cn:9092
Create a keytab for user
bobadmin
usingktutil
tool.Let's call this file
bobadmin.keytab
sshuser@hn0-umasec:~$ ktutil ktutil: addent -password -p bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -k 1 -e RC4-HMAC Password for <username>@<DOMAIN.COM> ktutil: wkt bobadmin.keytab ktutil: q Kinit the created keytab sudo kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytab
Create a
bobadmin_jaas.config
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./bobadmin.keytab" useTicketCache=false serviceName="kafka" principal="bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM"; };
Create topics
alicetopic2
andbobtopic2
asbobadmin
sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create alicetopic2 $KAFKABROKERS sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create bobtopic2 $KAFKABROKERS
Produce data to
alicetopic2
asbobadmin
sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer alicetopic2 $KAFKABROKERS
Produce data to
bobtopic2
asbobadmin
sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer bobadmin2 $KAFKABROKERS
In the Spark cluster, add entries in /etc/hosts
in spark worker nodes, for Kafka worker nodes, create keytabs, jaas_config files, and perform a spark-submit to submit a spark job to read from the kafka topic:
ssh into spark cluster with sshuser credentials
Make entries for the kafka worker nodes in
/etc/hosts
of the spark cluster.Note
Make the entry of these kafka worker nodes in every spark node (head node + worker node). You can get these details from kafka cluster in /etc/hosts of head node of Kafka.
10.3.16.118 wn0-umasec.securehadooprc.partner.onmschina.cn wn0-umasec wn0-umasec.securehadooprc.partner.onmschina.cn. wn0-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.chinacloudapp.cn 10.3.16.145 wn1-umasec.securehadooprc.partner.onmschina.cn wn1-umasec wn1-umasec.securehadooprc.partner.onmschina.cn. wn1-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.chinacloudapp.cn 10.3.16.176 wn2-umasec.securehadooprc.partner.onmschina.cn wn2-umasec wn2-umasec.securehadooprc.partner.onmschina.cn. wn2-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.chinacloudapp.cn
Create a keytab for user
bobadmin
using ktutil tool. Let's call this filebobadmin.keytab
Create a keytab for user
alicetest
using ktutil tool. Let's call this filealicetest.keytab
Create a
bobadmin_jaas.conf
as shown in following sampleKafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./bobadmin.keytab" useTicketCache=false serviceName="kafka" principal="bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM"; };
Create an
alicetest_jaas.conf
as shown in following sampleKafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./alicetest.keytab" useTicketCache=false serviceName="kafka" principal="alicetest@SECUREHADOOPRC.ONMICROSOFT.COM"; };
Get the spark-streaming jar ready.
Build your own jar that reads from a Kafka topic by following the example and instructions here for
DirectKafkaWorkCount
Note
For convenience, this sample jar used in this example was built from https://github.com/markgrover/spark-secure-kafka-app by following these steps.
sudo apt install maven
git clone https://github.com/markgrover/spark-secure-kafka-app.git
cd spark-secure-kafka-app
mvn clean package
cd target
From Spark cluster, read from kafka topic alicetopic2
as user alicetest
is allowed
Run a
kdestroy
command to remove the Kerberos tickets in credential cache by issuing following commandsshuser@hn0-umaspa:~$ kdestroy
Run the command
kinit
withalicetest
sshuser@hn0-umaspa:~$ kinit alicetest@SECUREHADOOPRC.ONMICROSOFT.COM -t alicetest.keytab
Run a
spark-submit
command to read from kafka topicalicetopic2
asalicetest
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
For example,
sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 alicetopic2 false
If you see the following error, which denotes the DNS (Domain Name Server) issue. Make sure to check Kafka worker nodes entry in
/etc/hosts
file in Spark cluster.Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7)) at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:770) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179) at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
From YARN UI, access the YARN job output you can see the
alicetest
user is able to read fromalicetopic2
. You can see the word count in the output.Following are the detailed steps on how to check the application output from YARN UI.
Go to YARN UI and open your application. Wait for the job to go to RUNNING state. You'll see the following application details.
Click Logs. You'll see the following list of logs.
Click 'stdout'. You'll see the following output with the count of words from your Kafka topic.
On the Kafka cluster's Ranger UI, audit logs for the same will be shown.
From Spark cluster, read Kafka topic bobtopic2
as user alicetest
is denied
Run
kdestroy
command to remove the Kerberos tickets in credential cache by issuing following commandsshuser@hn0-umaspa:~$ kdestroy
Run the command
kinit
withalicetest
sshuser@hn0-umaspa:~$ kinit alicetest@SECUREHADOOPRC.ONMICROSOFT.COM -t alicetest.keytab
Run spark-submit command to read from kafka topic
bobtopic2
asalicetest
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
For example,
sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicestest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 bobtopic2 false
From yarn UI, access the yarn job output you can see that
alicetest
user is unable to read frombobtopic2
and the job fails.On the Kafka cluster's Ranger UI, audit logs for the same will be shown.
From Spark cluster, read from kafka topic alicetopic2
as user bobadmin
is allowed
Run
kdestroy
command to remove the Kerberos tickets in credential cachesshuser@hn0-umaspa:~$ kdestroy
Run
kinit
command withbobadmin
sshuser@hn0-umaspa:~$ kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytab
Run
spark-submit
command to read from kafka topicalicetopic2
asbobadmin
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
For example,
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 alicetopic2 false
From YARN UI, access the yarn job output you can see that
bobadmin
user is able to read fromalicetopic2
and the count of words is seen in the output.On the Kafka cluster's Ranger UI, audit logs for the same will be shown.
From Spark cluster, read from Kafka topic bobtopic2
as user bobadmin
is allowed.
Remove the Kerberos tickets in Credential Cache by running following command
sshuser@hn0-umaspa:~$ kdestroy
Run
kinit
withbobadmin
sshuser@hn0-umaspa:~$ kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytab
Run a
spark-submit
command to read from Kafka topicbobtopic2
asbobadmin
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
For example,
spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 bobtopic2 false
From YARN UI, access the YARN job output you can see that
bobtest
user is able to read frombobtopic2
and the count of words is seen in the output.On the Kafka cluster's Ranger UI, audit logs for the same will be shown.