将 Apache Spark 结构化流式处理与 Apache Kafka 和 Azure Cosmos DB 配合使用Use Apache Spark Structured Streaming with Apache Kafka and Azure Cosmos DB

了解如何使用 Apache Spark 结构化流式处理Apache Kafka on Azure HDInsight 读取数据,然后将数据存储到 Azure Cosmos DB 中。Learn how to use Apache Spark Structured Streaming to read data from Apache Kafka on Azure HDInsight, and then store the data into Azure Cosmos DB.

Azure Cosmos DB 是一种全球分布式多模型数据库。Azure Cosmos DB is a globally distributed, multi-model database. 此示例使用 SQL API 数据库模型。This example uses a SQL API database model. 有关详细信息,请参阅欢迎使用 Azure Cosmos DB 文档。For more information, see the Welcome to Azure Cosmos DB document.

Spark 结构化流式处理是建立在 Spark SQL 上的流处理引擎。Spark structured streaming is a stream processing engine built on Spark SQL. 这允许以与批量计算相同的方式表达针对静态数据的流式计算。It allows you to express streaming computations the same as batch computation on static data. 有关结构化流式处理的详细信息,请参阅 Apache.org 上的 Structured Streaming Programming Guide(结构化流式处理编程指南)。For more information on Structured Streaming, see the Structured Streaming Programming Guide at Apache.org.

重要

此示例使用了 Spark 2.2 on HDInsight 3.6。This example used Spark 2.2 on HDInsight 3.6.

本文档中的步骤创建一个 Azure 资源组,其中同时包含 HDInsight 上的 Spark 和 HDInsight 上的 Kafka 群集。The steps in this document create an Azure resource group that contains both a Spark on HDInsight and a Kafka on HDInsight cluster. 这些群集都位于一个 Azure 虚拟网络中,这样 Spark 群集便可与 Kafka 群集直接通信。These clusters are both located within an Azure Virtual Network, which allows the Spark cluster to directly communicate with the Kafka cluster.

完成本文档中的步骤后,请记得删除这些群集,避免支付额外费用。When you are done with the steps in this document, remember to delete the clusters to avoid excess charges.

创建群集Create the clusters

Apache Kafka on HDInsight 不提供通过公共 Internet 访问 Kafka 中转站的权限。Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. 若要与 Kafka 通信,必须与 Kafka 群集中的节点在同一 Azure 虚拟网络中。Anything that talks to Kafka must be in the same Azure virtual network as the nodes in the Kafka cluster. 在此示例中,Kafka 群集和 Spark 群集都位于一个 Azure 虚拟网络中。For this example, both the Kafka and Spark clusters are located in an Azure virtual network. 下图显示了这两个群集之间通信的流动方式:The following diagram shows how communication flows between the clusters:

Azure 虚拟网络中的 Spark 和 Kafka 群集图表

备注

Kafka 服务仅限于虚拟网络内的通信。The Kafka service is limited to communication within the virtual network. 通过 Internet 可访问群集上的其他服务,例如 SSH 和 Ambari。Other services on the cluster, such as SSH and Ambari, can be accessed over the internet. 有关可用于 HDInsight 的公共端口的详细信息,请参阅 HDInsight 使用的端口和 URIFor more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

虽然可以手动创建 Azure 虚拟网络、Kafka 和 Spark 群集,但是使用 Azure Resource Manager 模板会更容易。While you can create an Azure virtual network, Kafka, and Spark clusters manually, it's easier to use an Azure Resource Manager template. 使用以下步骤将 Azure 虚拟网络、Kafka 和 Spark 群集部署到 Azure 订阅。Use the following steps to deploy an Azure virtual network, Kafka, and Spark clusters to your Azure subscription.

  1. 使用以下按钮登录到 Azure,并在 Azure 门户中打开模板。Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure

    Azure 资源管理器模板位于此项目的 GitHub 存储库中 (https://github.com/Azure-Samples/hdinsight-spark-scala-kafka-cosmosdb)。The Azure Resource Manager template is located in the GitHub repository for this project (https://github.com/Azure-Samples/hdinsight-spark-scala-kafka-cosmosdb).

    此模板可创建以下资源:This template creates the following resources:

    • Kafka on HDInsight 3.6 群集。A Kafka on HDInsight 3.6 cluster.

    • Spark on HDInsight 3.6 群集。A Spark on HDInsight 3.6 cluster.

    • 包含 HDInsight 群集的 Azure 虚拟网络。An Azure Virtual Network, which contains the HDInsight clusters.

      备注

      通过模板创建的虚拟网络使用 10.0.0.0/16 地址空间。The virtual network created by the template uses the 10.0.0.0/16 address space.

    • Azure Cosmos DB SQL API 数据库。An Azure Cosmos DB SQL API database.

      重要

      本示例使用的结构化流式处理笔记本需要 Spark on HDInsight 3.6。The structured streaming notebook used in this example requires Spark on HDInsight 3.6. 如果使用早期版本的 Spark on HDInsight,则使用笔记本时会收到错误消息。If you use an earlier version of Spark on HDInsight, you receive errors when using the notebook.

  2. 使用以下信息填充“自定义部署”部分中的条目 :Use the following information to populate the entries on the Custom deployment section:

    属性Property ValueValue
    订阅Subscription 选择 Azure 订阅。Select your Azure subscription.
    资源组Resource group 创建一个组或选择有个现有的组。Create a group or select an existing one. 此组包含 HDInsight 群集。This group contains the HDInsight cluster.
    Cosmos DB 帐户名Cosmos DB Account Name 此值用作 Cosmos DB 帐户的名称。This value is used as the name for the Cosmos DB account. 名称只能包含小写字母、数字和连字符 (-) 字符。The name can only contain lowercase letters, numbers, and the hyphen (-) character. 它的长度必须介于 3 到 31 个字符之间。It must be between 3-31 characters in length.
    基群集名称Base Cluster Name 此值将用作 Spark 和 Kafka 群集的基名称。This value is used as the base name for the Spark and Kafka clusters. 例如,输入 myhdi 将创建名为 spark-myhdi 的 Spark 群集和名为 kafka-myhdi 的 Kafka 群集 。For example, entering myhdi creates a Spark cluster named spark-myhdi and a Kafka cluster named kafka-myhdi.
    群集版本Cluster Version HDInsight 群集版本。The HDInsight cluster version. 此示例使用 HDInsight 3.6 进行测试,可能不适用于其他群集类型。This example is tested with HDInsight 3.6, and may not work with other cluster types.
    群集登录用户名Cluster Login User Name Spark 和 Kafka 群集的管理员用户名。The admin user name for the Spark and Kafka clusters.
    群集登录密码Cluster Login Password Spark 和 Kafka 群集的管理员用户密码。The admin user password for the Spark and Kafka clusters.
    SSH 用户名Ssh User Name 创建 Spark 和 Kafka 群集的 SSH 用户。The SSH user to create for the Spark and Kafka clusters.
    SSH 密码Ssh Password Spark 和 Kafka 群集的 SSH 用户的密码。The password for the SSH user for the Spark and Kafka clusters.

    HDInsight 自定义部署值

  3. 阅读“条款和条件” ,并选择“我同意上述条款和条件” 。Read the Terms and Conditions, and then select I agree to the terms and conditions stated above.

  4. 最后,选择“购买” 。Finally, select Purchase. 创建群集、虚拟网络和 Cosmos DB 帐户最多可能需要 45 分钟时间。It may take up to 45 minutes to create the clusters, virtual network, and Cosmos DB account.

创建 Cosmos DB 数据库和集合Create the Cosmos DB database and collection

本文档使用的项目在 Cosmos DB 中存储数据。The project used in this document stores data in Cosmos DB. 运行代码之前,必须首先在 Cosmos DB 实例中创建数据库和集合 。Before running the code, you must first create a database and collection in your Cosmos DB instance. 还必须检索文档终结点,以及用于对 Cosmos DB 的请求进行身份验证的密钥 。You must also retrieve the document endpoint and the key used to authenticate requests to Cosmos DB.

可使用 Azure CLI 执行此操作。One way to do this is to use the Azure CLI. 以下脚本将创建名为 kafkadata 的数据库和名为 kafkacollection 的集合。The following script will create a database named kafkadata and a collection named kafkacollection. 然后,将返回主键。It then returns the primary key.

#!/bin/bash

# Replace 'myresourcegroup' with the name of your resource group
resourceGroupName='myresourcegroup'
# Replace 'mycosmosaccount' with the name of your Cosmos DB account name
name='mycosmosaccount'

# WARNING: If you change the databaseName or collectionName
#          then you must update the values in the Jupyter notebook
databaseName='kafkadata'
collectionName='kafkacollection'

# Create the database
az cosmosdb sql database create --account-name $name --name $databaseName --resource-group $resourceGroupName

# Create the collection
az cosmosdb sql container create --account-name $name --database-name $databaseName --name $collectionName --partition-key-path "/my/path" --resource-group $resourceGroupName

# Get the endpoint
az cosmosdb show --name $name --resource-group $resourceGroupName --query documentEndpoint

# Get the primary key
az cosmosdb keys list --name $name --resource-group $resourceGroupName --type keys

文档终结点和主键信息与以下文本类似:The document endpoint and primary key information is similar to the following text:

# endpoint
"https://mycosmosaccount.documents.azure.com:443/"
# key
"YqPXw3RP7TsJoBF5imkYR0QNA02IrreNAlkrUMkL8EW94YHs41bktBhIgWq4pqj6HCGYijQKMRkCTsSaKUO2pw=="

重要

保存终结点和键值,以便用于 Jupyter 笔记本。Save the endpoint and key values, as they are needed in the Jupyter Notebooks.

获取 NotebookGet the notebooks

https://github.com/Azure-Samples/hdinsight-spark-scala-kafka-cosmosdb 上提供了本文档中所述的示例的代码。The code for the example described in this document is available at https://github.com/Azure-Samples/hdinsight-spark-scala-kafka-cosmosdb.

上传 NotebookUpload the notebooks

使用下列步骤,将项目中的 Notebook 上传到 Spark on HDInsight 群集:Use the following steps to upload the notebooks from the project to your Spark on HDInsight cluster:

  1. 在 Web 浏览器中,连接到 Spark 群集上的 Jupyter Notebook。In your web browser, connect to the Jupyter notebook on your Spark cluster. 在下列 URL 中,将 CLUSTERNAME 替换为你的 Spark 群集名:In the following URL, replace CLUSTERNAME with the name of your Spark cluster:

     https://CLUSTERNAME.azurehdinsight.cn/jupyter
    

    出现提示时,输入创建群集时使用的群集登录名(管理员)和密码。When prompted, enter the cluster login (admin) and password used when you created the cluster.

  2. 在页面右上角,使用“上传”按钮将“Stream-taxi-data-to-kafka.ipynb”文件上传到群集 。From the upper right side of the page, use the Upload button to upload the Stream-taxi-data-to-kafka.ipynb file to the cluster. 选择“打开”开始上传 。Select Open to start the upload.

  3. 在笔记本列表中找到“Stream-taxi-data-to-kafka.ipynb”项,然后选择其旁边的“上传”按钮 。Find the Stream-taxi-data-to-kafka.ipynb entry in the list of notebooks, and select Upload button beside it.

  4. 重复步骤 1-3 加载 Stream-data-from-Kafka-to-Cosmos-DB.ipynb 笔记本。Repeat steps 1-3 to load the Stream-data-from-Kafka-to-Cosmos-DB.ipynb notebook.

将出租车数据加载到 Kafka 中Load taxi data into Kafka

上传文件后,选择“Stream-taxi-data-to-kafka.ipynb”项打开笔记本 。Once the files have been uploaded, select the Stream-taxi-data-to-kafka.ipynb entry to open the notebook. 按照笔记本中的步骤将数据加载到 Kafka 中。Follow the steps in the notebook to load data into Kafka.

使用 Spark 结构化流式处理来处理出租车数据Process taxi data using Spark Structured Streaming

Jupyter Notebook 主页上,选择 Stream-data-from-Kafka-to-Cosmos-DB.ipynb 项。From the Jupyter Notebook home page, select the Stream-data-from-Kafka-to-Cosmos-DB.ipynb entry. 按照笔记本中的步骤使用 Spark 结构化流式处理将 Kafka 中的数据流式传输到 Azure Cosmos DB。Follow the steps in the notebook to stream data from Kafka and into Azure Cosmos DB using Spark Structured Streaming.

后续步骤Next steps

现在你已了解如何使用 Apache Spark 结构化流式处理,请参阅下列文档,深入了解如何使用 Apache Spark、Apache Kafka 和 Azure Cosmos DB:Now that you have learned how to use Apache Spark Structured Streaming, see the following documents to learn more about working with Apache Spark, Apache Kafka, and Azure Cosmos DB: