教程:将 Apache Spark 结构化流式处理与 Apache Kafka on HDInsight 配合使用Tutorial: Use Apache Spark Structured Streaming with Apache Kafka on HDInsight

本教程说明如何使用 Apache Spark 结构化流式处理Apache Kafka on Azure HDInsight 来读取和写入数据。This tutorial demonstrates how to use Apache Spark Structured Streaming to read and write data with Apache Kafka on Azure HDInsight.

Spark 结构化流式处理是建立在 Spark SQL 上的流处理引擎。Spark structured streaming is a stream processing engine built on Spark SQL. 这允许以与批量计算相同的方式表达针对静态数据的流式计算。It allows you to express streaming computations the same as batch computation on static data.

本教程介绍如何执行下列操作:In this tutorial, you learn how to:

  • 使用 Azure 资源管理器模板创建群集Use an Azure Resource Manager template to create clusters
  • 将 Spark 结构化流式处理与 Kafka 配合使用Use Spark Structured Streaming with Kafka

完成本文档中的步骤后,请记得删除这些群集,避免产生额外费用。When you are done with the steps in this document, remember to delete the clusters to avoid excess charges.

先决条件Prerequisites

Important

本文档中的步骤需要一个包含 Spark on HDInsight 和 Kafka on HDInsight 群集的 Azure 资源组。The steps in this document require an Azure resource group that contains both a Spark on HDInsight and a Kafka on HDInsight cluster. 这些群集都位于一个 Azure 虚拟网络中,这样 Spark 群集便可与 Kafka 群集直接通信。These clusters are both located within an Azure Virtual Network, which allows the Spark cluster to directly communicate with the Kafka cluster.

为方便起见,本文档链接到了一个模板,该模板可创建所有所需 Azure 资源。For your convenience, this document links to a template that can create all the required Azure resources.

有关在虚拟网络中使用 HDInsight 的详细信息,请参阅为 HDInsight 规划虚拟网络文档。For more information on using HDInsight in a virtual network, see the Plan a virtual network for HDInsight document.

将结构化流式处理与 Apache Kafka 配合使用Structured Streaming with Apache Kafka

Spark 结构化流式处理是建立在 Spark SQL 引擎上的流处理引擎。Spark Structured Streaming is a stream processing engine built on the Spark SQL engine. 使用结构化流式处理时,可以使用与编写批处理查询相同的方式来编写流式处理查询。When using Structured Streaming, you can write streaming queries the same way that you write batch queries.

以下代码片段演示了从 Kafka 读取数据并存储到文件。The following code snippets demonstrate reading from Kafka and storing to file. 第一个为批处理操作,而第二个为流式处理操作:The first one is a batch operation, while the second one is a streaming operation:

// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()
// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .write
                .format("parquet")
                .option("path","/example/batchtripdata")
                .option("checkpointLocation", "/batchcheckpoint")
                .save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()
// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .writeStream
                .format("parquet")
                .option("path","/example/streamingtripdata")
                .option("checkpointLocation", "/streamcheckpoint")
                .start.awaitTermination(30000)

在这两个代码片段中,从 Kafka 读取数据并写入文件。In both snippets, data is read from Kafka and written to file. 示例之间的区别如下:The differences between the examples are:

批处理Batch 流式处理Streaming
read readStream
write writeStream
save start

流式处理操作还使用 awaitTermination(30000),这会在 30,000 毫秒后停止流。The streaming operation also uses awaitTermination(30000), which stops the stream after 30,000 ms.

若要将结构化流式处理与 Kafka 配合使用,项目必须具有针对 org.apache.spark : spark-sql-kafka-0-10_2.11 包的依赖项。To use Structured Streaming with Kafka, your project must have a dependency on the org.apache.spark : spark-sql-kafka-0-10_2.11 package. 此包的版本应与 Spark on HDInsight 的版本相匹配。The version of this package should match the version of Spark on HDInsight. 对于 Spark 2.2.0(已在 HDInsight 3.6 中提供),可以在 https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar 找到不同项目类型的依赖项信息。For Spark 2.2.0 (available in HDInsight 3.6), you can find the dependency information for different project types at https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.

对于在本教程中使用的 Jupyter Notebook,以下单元格会加载此包依赖项:For the Jupyter Notebook used with this tutorial, the following cell loads this package dependency:

%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

创建群集Create the clusters

Apache Kafka on HDInsight 不提供通过公共 Internet 访问 Kafka 中转站的权限。Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. 使用 Kafka 的任何项都必须位于同一 Azure 虚拟网络中。Anything that uses Kafka must be in the same Azure virtual network. 在本教程中,Kafka 和 Spark 群集位于同一 Azure 虚拟网络。In this tutorial, both the Kafka and Spark clusters are located in the same Azure virtual network.

下图显示通信在 Spark 和 Kafka 之间的流动方式:The following diagram shows how communication flows between Spark and Kafka:

Azure 虚拟网络中的 Spark 和 Kafka 群集的关系图

Note

Kafka 服务仅限于虚拟网络内的通信。The Kafka service is limited to communication within the virtual network. 通过 Internet 可访问群集上的其他服务,例如 SSH 和 Ambari。Other services on the cluster, such as SSH and Ambari, can be accessed over the internet. 有关可用于 HDInsight 的公共端口的详细信息,请参阅 HDInsight 使用的端口和 URIFor more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

若要创建 Azure 虚拟网络,然后在其中创建 Kafka 和 Spark 群集,请使用以下步骤:To create an Azure Virtual Network, and then create the Kafka and Spark clusters within it, use the following steps:

  1. 使用以下按钮登录到 Azure,并在 Azure 门户中打开模板。Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure

    Azure 资源管理器模板位于 https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.jsonThe Azure Resource Manager template is located at https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.

    此模板可创建以下资源:This template creates the following resources:

    • Kafka on HDInsight 3.6 群集。A Kafka on HDInsight 3.6 cluster.

    • Spark 2.2.0 on HDInsight 3.6 群集。A Spark 2.2.0 on HDInsight 3.6 cluster.

    • 包含 HDInsight 群集的 Azure 虚拟网络。An Azure Virtual Network, which contains the HDInsight clusters.

      Important

      本教程使用的结构化流式处理笔记本需要 Spark 2.2.0 on HDInsight 3.6。The structured streaming notebook used in this tutorial requires Spark 2.2.0 on HDInsight 3.6. 如果使用早期版本的 Spark on HDInsight,则使用笔记本时会收到错误消息。If you use an earlier version of Spark on HDInsight, you receive errors when using the notebook.

  2. 使用以下信息填充“自定义模板”部分的条目 :Use the following information to populate the entries on the Customized template section:

    设置Setting ValueValue
    订阅Subscription Azure 订阅Your Azure subscription
    资源组Resource group 包含资源的资源组。The resource group that contains the resources.
    位置Location 创建资源时所在的 Azure 区域。The Azure region that the resources are created in.
    Spark 群集名称Spark Cluster Name Spark 群集的名称。The name of the Spark cluster. 前六个字符必须与 Kafka 群集名称不同。The first six characters must be different than the Kafka cluster name.
    Kafka 群集名称Kafka Cluster Name Kafka 群集的名称。The name of the Kafka cluster. 前六个字符必须与 Spark 群集名称不同。The first six characters must be different than the Spark cluster name.
    群集登录用户名Cluster Login User Name 群集的管理员用户名。The admin user name for the clusters.
    群集登录密码Cluster Login Password 群集的管理员用户密码。The admin user password for the clusters.
    SSH 用户名SSH User Name 要为群集创建的 SSH 用户。The SSH user to create for the clusters.
    SSH 密码SSH Password 用于 SSH 用户的密码。The password for the SSH user.

    自定义模板的屏幕截图

  3. 阅读“条款和条件”,然后选择“我同意上述条款和条件” Read the Terms and Conditions, and then select I agree to the terms and conditions stated above

  4. 选择“购买”。 Select Purchase.

Note

创建群集可能需要长达 20 分钟的时间。It can take up to 20 minutes to create the clusters.

使用 Spark 结构化流式处理Use Spark Structured Streaming

本示例演示了如何将 Spark 结构化流式处理与 Kafka on HDInsight 配合使用。This example demonstrates how to use Spark Structured Streaming with Kafka on HDInsight. 它使用纽约市提供的出租车行程数据。It uses data on taxi trips, which is provided by New York City. 此笔记本使用的数据集来自 2016 绿色出租车行程数据The data set used by this notebook is from 2016 Green Taxi Trip Data.

  1. 收集主机信息。Gather host information. 使用下面的 curl 和 jq 命令获取 Kafka ZooKeeper 主机和代理主机信息。Use the curl and jq commands below to obtain your Kafka ZooKeeper and broker hosts information. 这些命令设计用于 Windows 命令提示符,在其他环境中需要进行细微的更改。The commands are designed for a Windows command prompt, slight variations will be needed for other environments. KafkaCluster 替换为 Kafka 群集的名称,并将 KafkaPassword 替换为群集登录密码。Replace KafkaCluster with the name of your Kafka cluster, and KafkaPassword with the cluster login password. 另外,将 C:\HDI\jq-win64.exe 替换为 jq 安装的实际路径。Also, replace C:\HDI\jq-win64.exe with the actual path to your jq installation. 在 Windows 命令提示符中输入命令,然后保存输出,以便在后续步骤中使用。Enter the commands in a Windows command prompt and save the output for use in later steps.

    set CLUSTERNAME=KafkaCluster
    set PASSWORD=KafkaPassword
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.cn/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")"
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.cn/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
    
  2. 在 Web 浏览器中,连接到 Spark 群集上的 Jupyter Notebook。In your web browser, connect to the Jupyter notebook on your Spark cluster. 在下列 URL 中,将 CLUSTERNAME 替换为你的 Spark 群集名:In the following URL, replace CLUSTERNAME with the name of your Spark cluster:

     https://CLUSTERNAME.azurehdinsight.cn/jupyter
    

    出现提示时,输入创建群集时使用的群集登录名(管理员)和密码。When prompted, enter the cluster login (admin) and password used when you created the cluster.

  3. 选择“新建”>“Spark”,创建一个笔记本。 Select New > Spark to create a notebook.

  4. Spark 流式处理具有微型批处理,这意味着数据是成批传入的,而执行程序则对这批数据运行。Spark streaming has microbatching, which means data comes as batches and executers run on the batches of data. 如果执行程序的空闲超时少于处理批处理所需的时间,则将不断添加和删除执行程序。If the executor has idle timeout less than the time it takes to process the batch then the executors would be constantly added and removed. 如果执行程序的空闲超时大于批处理持续时间,则不会删除执行程序。If the executors idle timeout is greater than the batch duration, the executor never gets removed. 因此,我们建议你在运行流式处理应用程序时通过将 spark.dynamicAllocation.enabled 设置为 false 来禁用动态分配。Hence we recommend that you disable dynamic allocation by setting spark.dynamicAllocation.enabled to false when running streaming applications.

    加载供 Notebook 使用的包,方法是在 Notebook 单元格中输入以下信息。Load packages used by the Notebook by entering the following information in a Notebook cell. 使用 CTRL + ENTER 运行该命令。Run the command by using CTRL + ENTER.

    %%configure -f
    {
        "conf": {
            "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
            "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
            "spark.dynamicAllocation.enabled": false
        }
    }
    
  5. 创建 Kafka 主题。Create the Kafka topic. 编辑以下命令,将 YOUR_ZOOKEEPER_HOSTS 替换为在第一步提取的 Zookeeper 主机信息。Edit the command below by replacing YOUR_ZOOKEEPER_HOSTS with the Zookeeper host information extracted in the first step. 在 Jupyter Notebook 中输入编辑的命令,创建 tripdata 主题。Enter the edited command in your Jupyter Notebook to create the tripdata topic.

    %%bash
    export KafkaZookeepers="YOUR_ZOOKEEPER_HOSTS"
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
    
  6. 检索出租车行程数据。Retrieve data on taxi trips. 在下一单元格中输入此命令,加载纽约市出租车行程数据。Enter the command in the next cell to load data on taxi trips in New York City. 先将数据加载到数据帧中,然后将数据帧作为单元格输出显示。The data is loaded into a dataframe and then the dataframe is displayed as the cell output.

    import spark.implicits._
    
    // Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
    val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
    val result = scala.io.Source.fromURL(url).mkString
    
    // Create a dataframe from the JSON data
    val taxiDF = spark.read.json(Seq(result).toDS)
    
    // Display the dataframe containing trip data
    taxiDF.show()
    
  7. 设置 Kafka 代理主机信息。Set the Kafka broker hosts information. YOUR_KAFKA_BROKER_HOSTS 替换为在步骤 1 中提取的代理主机信息。Replace YOUR_KAFKA_BROKER_HOSTS with the broker hosts information you extracted in step 1. 在下一 Jupyter Notebook 单元格中输入编辑的命令。Enter the edited command in the next Jupyter Notebook cell.

    // The Kafka broker hosts and topic used to write to Kafka
    val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS"
    val kafkaTopic="tripdata"
    
    println("Finished setting Kafka broker and topic configuration.")
    
  8. 将数据发送到 Kafka。Send the data to Kafka. 在以下命令中,vendorid 字段用作 Kafka 消息的键值。In the following command, the vendorid field is used as the key value for the Kafka message. 将数据分区时,此键供 Kafka 使用。The key is used by Kafka when partitioning data. 所有字段都作为 JSON 字符串值存储在 Kafka 消息中。All of the fields are stored in the Kafka message as a JSON string value. 在 Jupyter 中输入以下命令,使用批量查询将数据保存到 Kafka。Enter the following command in Jupyter to save the data to Kafka using a batch query.

    // Select the vendorid as the key and save the JSON string as the value.
    val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()
    
    println("Data sent to Kafka")
    
  9. 声明一个架构。Declare a schema. 以下命令演示了从 Kafka 读取 JSON 数据时如何使用架构。The following command demonstrates how to use a schema when reading JSON data from kafka. 在下一 Jupyter 单元格中输入此命令。Enter the command in your next Jupyter cell.

    // Import bits useed for declaring schemas and working with JSON data
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    // Define a schema for the data
    val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
    // Reproduced here for readability
    //val schema = (new StructType)
    //   .add("dropoff_latitude", StringType)
    //   .add("dropoff_longitude", StringType)
    //   .add("extra", StringType)
    //   .add("fare_amount", StringType)
    //   .add("improvement_surcharge", StringType)
    //   .add("lpep_dropoff_datetime", StringType)
    //   .add("lpep_pickup_datetime", StringType)
    //   .add("mta_tax", StringType)
    //   .add("passenger_count", StringType)
    //   .add("payment_type", StringType)
    //   .add("pickup_latitude", StringType)
    //   .add("pickup_longitude", StringType)
    //   .add("ratecodeid", StringType)
    //   .add("store_and_fwd_flag", StringType)
    //   .add("tip_amount", StringType)
    //   .add("tolls_amount", StringType)
    //   .add("total_amount", StringType)
    //   .add("trip_distance", StringType)
    //   .add("trip_type", StringType)
    //   .add("vendorid", StringType)
    
    println("Schema declared")
    
  10. 选择数据并启动流。Select data and start the stream. 以下命令演示如何使用批量查询从 Kafka 检索数据,然后将结果写入 Spark 群集上的 HDFS。The following command demonstrates how to retrieve data from kafka using a batch query, and then write the results out to HDFS on the Spark cluster. 在此示例中,select 从 Kafka 检索消息(值字段),然后为其应用架构。In this example, the select retrieves the message (value field) from Kafka and applies the schema to it. 然后,将数据以 parquet 格式写入 HDFS(WASB 或 ADL)。The data is then written to HDFS (WASB or ADL) in parquet format. 在下一 Jupyter 单元格中输入此命令。Enter the command in your next Jupyter cell.

    // Read a batch from Kafka
    val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data and write to file
    val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
    
    println("Wrote data to file")
    
  11. 可以在下一 Jupyter 单元格中输入此命令,验证这些文件是否已创建。You can verify that the files were created by entering the command in your next Jupyter cell. 它会在 /example/batchtripdata 目录中列出文件。It lists the files in the /example/batchtripdata directory.

    %%bash
    hdfs dfs -ls /example/batchtripdata
    
  12. 上一示例使用了批量查询,而以下命令则演示如何使用流式处理查询执行相同的操作。While the previous example used a batch query, the following command demonstrates how to do the same thing using a streaming query. 在下一 Jupyter 单元格中输入此命令。Enter the command in your next Jupyter cell.

    // Stream from Kafka
    val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data from the stream and write to file
    kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
    println("Wrote data to file")
    
  13. 运行以下单元格,验证是否已通过流式处理查询写入这些文件。Run the following cell to verify that the files were written by the streaming query.

    %%bash
    hdfs dfs -ls /example/streamingtripdata
    

清理资源Clean up resources

若要清理本教程创建的资源,可以删除资源组。To clean up the resources created by this tutorial, you can delete the resource group. 删除资源组也会删除相关联的 HDInsight 群集,以及与资源组相关联的任何其他资源。Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

若要使用 Azure 门户删除资源组,请执行以下操作:To remove the resource group using the Azure portal:

  1. 在 Azure 门户中展开左侧的菜单,打开服务菜单,然后选择“资源组”以显示资源组的列表。 In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. 找到要删除的资源组,然后右键单击列表右侧的“更多”按钮 (...)。 Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. 选择“删除资源组”,然后进行确认。 Select Delete resource group, and then confirm.

Warning

创建群集后便开始 HDInsight 群集计费,删除群集后停止计费。HDInsight cluster billing starts once a cluster is created and stops when the cluster is deleted. 群集以每分钟按比例收费,因此无需再使用群集时,应始终将其删除。Billing is pro-rated per minute, so you should always delete your cluster when it is no longer in use.

删除 Kafka on HDInsight 群集会删除存储在 Kafka 中的任何数据。Deleting a Kafka on HDInsight cluster deletes any data stored in Kafka.

后续步骤Next steps

本教程介绍了如何使用 Apache Spark 结构化流式处理Apache Kafka on HDInsight 写入和读取数据。In this tutorial, you learned how to use Apache Spark Structured Streaming to write and read data from Apache Kafka on HDInsight. 使用以下链接,了解如何将 Apache Storm 和 Kafka 结合使用。Use the following link to learn how to use Apache Storm with Kafka.