教程:使用事件中心将数据流式传输到 Azure DatabricksTutorial: Stream data into Azure Databricks using Event Hubs

在本教程中,我们要将数据引入系统连接到 Azure Databricks,以便以近乎实时的速度将数据流式传输到 Apache Spark 群集。In this tutorial, you connect a data ingestion system with Azure Databricks to stream data into an Apache Spark cluster in near real-time. 我们将使用 Azure 事件中心设置数据引入系统,然后将此系统连接到 Azure Databricks 以处理传入的消息。You set up data ingestion system using Azure Event Hubs and then connect it to Azure Databricks to process the messages coming through. 若要访问数据流,可以使用 Twitter API 将推文引入事件中心。To access a stream of data, you use Twitter APIs to ingest tweets into Event Hubs. 在 Azure Databricks 中准备好数据后,可以运行分析作业来进一步分析数据。Once you have the data in Azure Databricks, you can run analytical jobs to further analyze the data.

本教程结束时,应会收到来自 Twitter 的流式传输推文(其中包含“Azure”一词),并可以在 Azure Databricks 中阅读推文。By the end of this tutorial, you would have streamed tweets from Twitter (that have the term "Azure" in them) and read the tweets in Azure Databricks.

下图演示了应用程序流:The following illustration shows the application flow:

Azure Databricks 与事件中心Azure Databricks with Event Hubs

本教程涵盖以下任务:This tutorial covers the following tasks:

  • 创建 Azure Databricks 工作区Create an Azure Databricks workspace
  • 在 Azure Databricks 中创建 Spark 群集Create a Spark cluster in Azure Databricks
  • 创建用于访问流数据的 Twitter 应用Create a Twitter app to access streaming data
  • 在 Azure Databricks 中创建 NotebookCreate notebooks in Azure Databricks
  • 附加事件中心和 Twitter API 的库Attach libraries for Event Hubs and Twitter API
  • 将推文发送到事件中心Send tweets to Event Hubs
  • 读取事件中心的推文Read tweets from Event Hubs

如果还没有 Azure 订阅,可以在开始前创建一个免费帐户If you don't have an Azure subscription, create a free account before you begin.

备注

不能使用 Azure 免费试用订阅完成本教程 。This tutorial cannot be carried out using Azure Free Trial Subscription . 如果你有免费帐户,请转到个人资料并将订阅更改为“即用即付” 。If you have a free account, go to your profile and change your subscription to pay-as-you-go . 有关详细信息,请参阅 Azure 免费帐户For more information, see Azure free account. 然后,移除支出限制,并为你所在区域的 vCPU 请求增加配额Then, remove the spending limit, and request a quota increase for vCPUs in your region. 创建 Azure Databricks 工作区时,可以选择“试用版(高级 - 14天免费 DBU)” 定价层,让工作区访问免费的高级 Azure Databricks DBU 14 天。When you create your Azure Databricks workspace, you can select the Trial (Premium - 14-Days Free DBUs) pricing tier to give the workspace access to free Premium Azure Databricks DBUs for 14 days.

必备条件Prerequisites

在开始学习本教程之前,请确保满足以下要求:Before you start with this tutorial, make sure to meet the following requirements:

  • Azure 事件中心命名空间。An Azure Event Hubs namespace.
  • 命名空间中的事件中心。An Event Hub within the namespace.
  • 用于访问事件中心命名空间的连接字符串。Connection string to access the Event Hubs namespace. 该连接字符串应采用类似于 Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value> 的格式。The connection string should have a format similar to Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>.
  • 事件中心的共享访问策略名称和策略密钥。Shared access policy name and policy key for Event Hubs.

完成创建 Azure 事件中心命名空间和事件中心一文中的步骤即可满足这些要求。You can meet these requirements by completing the steps in the article, Create an Azure Event Hubs namespace and event hub.

登录到 Azure 门户Sign in to the Azure portal

登录 Azure 门户Sign in to the Azure portal.

创建 Azure Databricks 工作区Create an Azure Databricks workspace

在本部分,使用 Azure 门户创建 Azure Databricks 工作区。In this section, you create an Azure Databricks workspace using the Azure portal.

  1. 在 Azure 门户中,选择“创建资源” > “数据 + 分析” > “Azure Databricks”。In the Azure portal, select Create a resource > Data + Analytics > Azure Databricks .

    Azure 门户上的 DatabricksDatabricks on Azure portal

  2. 在“Azure Databricks 服务” 下,提供所需的值以创建 Databricks 工作区。Under Azure Databricks Service , provide the values to create a Databricks workspace.

    创建 Azure Databricks 工作区Create an Azure Databricks workspace

    提供以下值:Provide the following values:

    propertiesProperty 说明Description
    工作区名称Workspace name 提供 Databricks 工作区的名称Provide a name for your Databricks workspace
    订阅Subscription 从下拉列表中选择自己的 Azure 订阅。From the drop-down, select your Azure subscription.
    资源组Resource group 指定是要创建新的资源组还是使用现有的资源组。Specify whether you want to create a new resource group or use an existing one. 资源组是用于保存 Azure 解决方案相关资源的容器。A resource group is a container that holds related resources for an Azure solution. 有关详细信息,请参阅 Azure 资源组概述For more information, see Azure Resource Group overview.
    位置Location 选择“美国东部 2”。 Select East US 2 . 有关其他可用区域,请参阅各区域推出的 Azure 服务For other available regions, see Azure services available by region.
    定价层Pricing Tier 选择“标准”或“高级”。 Choose between Standard or Premium . 有关这些层的详细信息,请参阅 Databricks 价格页For more information on these tiers, see Databricks pricing page.

    选择“固定到仪表板” ,然后选择“创建” 。Select Pin to dashboard and then select Create .

  3. 创建帐户需要几分钟时间。The account creation takes a few minutes. 在创建帐户过程中,门户会在右侧显示“正在提交 Azure Databricks 的部署” 磁贴。During account creation, the portal displays the Submitting deployment for Azure Databricks tile on the right side. 可能需要在仪表板上向右滚动才能看到此磁贴。You may need to scroll right on your dashboard to see the tile. 另外,还会在屏幕顶部附近显示进度条。There is also a progress bar displayed near the top of the screen. 你可以查看任一区域来了解进度。You can watch either area for progress.

    Databricks 部署磁贴Databricks deployment tile

在 Databricks 中创建 Spark 群集Create a Spark cluster in Databricks

  1. 在 Azure 门户中,转到所创建的 Databricks 工作区,然后选择“启动工作区”。 In the Azure portal, go to the Databricks workspace that you created, and then select Launch Workspace .

  2. 随后将会重定向到 Azure Databricks 门户。You are redirected to the Azure Databricks portal. 在门户中选择“群集”。 From the portal, select Cluster .

    Azure 上的 DatabricksDatabricks on Azure

  3. 在“新建群集”页中,提供用于创建群集的值。 In the New cluster page, provide the values to create a cluster.

    在 Azure 上创建 Databricks Spark 群集Create Databricks Spark cluster on Azure

    除以下值外,接受其他所有默认值:Accept all other default values other than the following:

    • 输入群集的名称。Enter a name for the cluster.
    • 在本文中,请创建运行时为 6.0 的群集。For this article, create a cluster with 6.0 runtime.
    • 请务必选中 在不活动超过 __ 分钟后终止 复选框。Make sure you select the Terminate after __ minutes of inactivity checkbox. 提供一个持续时间(以分钟为单位),如果群集在这段时间内一直未被使用,则会将其终止。Provide a duration (in minutes) to terminate the cluster, if the cluster is not being used.

    选择适合你的技术标准和预算的群集辅助角色和驱动程序节点大小。Select cluster worker and driver node size suitable for your technical criteria and budget.

    选择“创建群集”。 Select Create cluster . 群集运行后,可将笔记本附加到该群集,并运行 Spark 作业。Once the cluster is running, you can attach notebooks to the cluster and run Spark jobs.

创建 Twitter 应用程序Create a Twitter application

若要接收推文流,请在 Twitter 中创建一个应用程序。To receive a stream of tweets, you create an application in Twitter. 遵照说明创建一个 Twitter 应用程序,并记下稍后需要在本教程中使用的值。Follow the instructions create a Twitter application and record the values that you need to complete this tutorial.

  1. 在 Web 浏览器中,转到面向开发人员的 Twitter,然后选择“创建应用” 。From a web browser, go to Twitter For Developers, and select Create an app . 可能会看到一条消息,指出你需要申请 Twitter 开发人员帐户。You might see a message saying that you need to apply for a Twitter developer account. 可以随意执行此操作,在你的申请获得批准后,应该会看到一封确认电子邮件。Feel free to do so, and after your application has been approved you should see a confirmation email. 批准开发人员帐户可能需要几天时间。It could take several days to be approved for a developer account.

    Twitter 开发人员帐户确认Twitter developer account confirmation

  2. 在“创建应用程序” 页中提供新应用的详细信息,然后选择“创建 Twitter 应用程序” 。In the Create an application page, provide the details for the new app, and then select Create your Twitter application .

    Twitter 应用程序详细信息Twitter application details

    Twitter 应用程序详细信息Twitter application details

  3. 在应用程序页中选择“密钥和令牌” 选项卡,复制“使用者 API 密钥” 和“使用者 API 密钥” 的值。In the application page, select the Keys and Tokens tab and copy the values for Consumer API Key and Consumer API Secret Key . 此外,在“访问令牌和访问令牌机密”下选择“创建”以生成访问令牌。Also, select Create under Access Token and Access Token Secret to generate the access tokens. 复制“访问令牌” 和 “访问令牌机密”的值。Copy the values for Access Token and Access Token Secret .

    Twitter 应用程序详细信息Twitter application details

保存 Twitter 应用程序的检索值。Save the values that you retrieved for the Twitter application. 稍后在本教程中需要用到这些值。You need the values later in the tutorial.

将库附加到 Spark 群集Attach libraries to Spark cluster

本教程使用 Twitter API 将推文发送到事件中心。In this tutorial, you use the Twitter APIs to send tweets to Event Hubs. 也可以使用 Apache Spark 事件中心连接器在 Azure 事件中心读取和写入数据。You also use the Apache Spark Event Hubs connector to read and write data into Azure Event Hubs. 若要将这些 API 用作群集的一部分,请将其作为库添加到 Azure Databricks,然后将其与 Spark 群集相关联。To use these APIs as part of your cluster, add them as libraries to Azure Databricks and associate them with your Spark cluster. 以下说明演示如何添加库。The following instructions show how to add a library.

  1. 在 Azure Databricks 工作区中,选择“群集” ,然后选择现有的 Spark 群集。In the Azure Databricks workspace, select Clusters , and choose your existing Spark cluster. 在群集菜单中,选择“库” ,然后单击“新安装” 。Within the cluster menu, choose Libraries and click Install New .

    “添加库”对话框Add library dialog box

    “添加库”对话框Add library dialog box

  2. 在“新建库” 页中,选择“Maven”作为“源” 。In the New Library page, for Source select Maven . 将 Spark 事件中心连接器和 Twitter API 的以下坐标分别输入“坐标” 中。Individually enter the following coordinates for the Spark Event Hubs connector and the Twitter API into Coordinates .

    • Spark 事件中心连接器 - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.12Spark Event Hubs connector - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.12
    • Twitter API - org.twitter4j:twitter4j-core:4.0.7Twitter API - org.twitter4j:twitter4j-core:4.0.7
  3. 选择“安装” 。Select Install .

  4. 在群集菜单中,确保两个库均已正确安装和附加。In the cluster menu, make sure both libraries are installed and attached properly.

    检查库Check libraries

  5. 针对 Twitter 包 twitter4j-core:4.0.7 重复上述步骤。Repeat these steps for the Twitter package, twitter4j-core:4.0.7.

在 Databricks 中创建 NotebookCreate notebooks in Databricks

在本部分,我们将使用以下名称在 Databricks 工作区中创建两个笔记本:In this section, you create two notebooks in Databricks workspace with the following names:

  • SendTweetsToEventHub - 用于从 Twitter 获取推文并将其流式传输到事件中心的生成者 Notebook。SendTweetsToEventHub - A producer notebook you use to get tweets from Twitter and stream them to Event Hubs.
  • ReadTweetsFromEventHub - 用于从事件中心读取推文的使用者笔记本。ReadTweetsFromEventHub - A consumer notebook you use to read the tweets from Event Hubs.
  1. 在左窗格中选择“工作区” 。In the left pane, select Workspace . 工作区 下拉列表中,选择 创建 > 笔记本From the Workspace drop-down, select Create > Notebook .

    在 Databricks 中创建笔记本Create notebook in Databricks

  2. 在“创建 Notebook” 对话框中输入 SendTweetsToEventHub ,选择 Scala 作为语言,并选择前面创建的 Spark 群集。In the Create Notebook dialog box, enter SendTweetsToEventHub , select Scala as the language, and select the Spark cluster that you created earlier.

    在 Databricks 中创建笔记本Create notebook in Databricks

    选择“创建” 。Select Create .

  3. 重复上述步骤创建 ReadTweetsFromEventHub 笔记本。Repeat the steps to create the ReadTweetsFromEventHub notebook.

将推文发送到事件中心Send tweets to Event Hubs

SendTweetsToEventHub 笔记本中粘贴以下代码,并将占位符替换为前面创建的事件中心命名空间和 Twitter 应用程序的值。In the SendTweetsToEventHub notebook, paste the following code, and replace the placeholders with values for your Event Hubs namespace and Twitter application that you created earlier. 此 Notebook 会将包含关键字“Azure”的推文实时流式传输到事件中心。This notebook streams tweets with the keyword "Azure" into Event Hubs in real time.

备注

Twitter API 有一定的请求限制和配额Twitter API has certain request restrictions and quotas. 如果你对 Twitter API 中的标准速率限制不满意,可以在本例中不使用 Twitter API 生成文本内容。If you are not satisfied with standard rate limiting in Twitter API, you can generate text content without using Twitter API in this example. 为此,请将变量 dataSource 设置为 test 而不是 twitter,并使用首选测试输入填充列表 testSourceTo do that, set variable dataSource to test instead of twitter and populate the list testSource with preferred test input.

    import scala.collection.JavaConverters._
    import com.microsoft.azure.eventhubs._
    import java.util.concurrent._
    import scala.collection.immutable._
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global

    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val pool = Executors.newScheduledThreadPool(1)
    val eventHubClient = EventHubClient.create(connStr.toString(), pool)

    def sleep(time: Long): Unit = Thread.sleep(time)

    def sendEvent(message: String, delay: Long) = {
      sleep(delay)
      val messageData = EventData.create(message.getBytes("UTF-8"))
      eventHubClient.get().send(messageData)
      System.out.println("Sent event: " + message + "\n")
    }

    // Add your own values to the list
    val testSource = List("Azure is the greatest!", "Azure isn't working :(", "Azure is okay.")

    // Specify 'test' if you prefer to not use Twitter API and loop through a list of values you define in `testSource`
    // Otherwise specify 'twitter'
    val dataSource = "test"

    if (dataSource == "twitter") {

      import twitter4j._
      import twitter4j.TwitterFactory
      import twitter4j.Twitter
      import twitter4j.conf.ConfigurationBuilder

      // Twitter configuration!
      // Replace values below with you

      val twitterConsumerKey = "<CONSUMER API KEY>"
      val twitterConsumerSecret = "<CONSUMER API SECRET>"
      val twitterOauthAccessToken = "<ACCESS TOKEN>"
      val twitterOauthTokenSecret = "<TOKEN SECRET>"

      val cb = new ConfigurationBuilder()
        cb.setDebugEnabled(true)
        .setOAuthConsumerKey(twitterConsumerKey)
        .setOAuthConsumerSecret(twitterConsumerSecret)
        .setOAuthAccessToken(twitterOauthAccessToken)
        .setOAuthAccessTokenSecret(twitterOauthTokenSecret)

      val twitterFactory = new TwitterFactory(cb.build())
      val twitter = twitterFactory.getInstance()

      // Getting tweets with keyword "Azure" and sending them to the Event Hub in realtime!
      val query = new Query(" #Azure ")
      query.setCount(100)
      query.lang("en")
      var finished = false
      while (!finished) {
        val result = twitter.search(query)
        val statuses = result.getTweets()
        var lowestStatusId = Long.MaxValue
        for (status <- statuses.asScala) {
          if(!status.isRetweet()){
            sendEvent(status.getText(), 5000)
          }
          lowestStatusId = Math.min(status.getId(), lowestStatusId)
        }
        query.setMaxId(lowestStatusId - 1)
      }

    } else if (dataSource == "test") {
      // Loop through the list of test input data
      while (true) {
        testSource.foreach {
          sendEvent(_,5000)
        }
      }

    } else {
      System.out.println("Unsupported Data Source. Set 'dataSource' to \"twitter\" or \"test\"")
    }

    // Closing connection to the Event Hub
    eventHubClient.get().close()

若要运行该 Notebook,请按 SHIFT + ENTERTo run the notebook, press SHIFT + ENTER . 随后会显示类似于以下片段的输出:You see an output like the snippet below. 输出中的每个事件是包含“Azure”一词的、已引入事件中心的推文。Each event in the output is a tweet that is ingested into the Event Hubs containing the term "Azure".

Sent event: @Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence

Sent event: Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure

Sent event: 4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie

Sent event: Migrate your databases to a fully managed service with Azure SQL Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk

Sent event: Top 10 Tricks to #Save Money with #Azure Virtual Machines https://t.co/F2wshBXdoz #Cloud

...
...

读取事件中心的推文Read tweets from Event Hubs

ReadTweetsFromEventHub 笔记本中粘贴以下代码,并将占位符替换为前面创建的 Azure 事件中心的值。In the ReadTweetsFromEventHub notebook, paste the following code, and replace the placeholder with values for your Azure Event Hubs that you created earlier. 此 Notebook 读取前面使用 SendTweetsToEventHub Notebook 流式传输到事件中心的推文。This notebook reads the tweets that you earlier streamed into Event Hubs using the SendTweetsToEventHub notebook.


    import org.apache.spark.eventhubs._
    import com.microsoft.azure.eventhubs._

    // Build connection string with the above information
    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val customEventhubParameters =
      EventHubsConf(connStr.toString())
      .setMaxEventsPerTrigger(5)

    val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

    incomingStream.printSchema

    // Sending the incoming stream into the console.
    // Data comes in batches!
    incomingStream.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

将返回以下输出:You get the following output:

root
 |-- body: binary (nullable = true)
 |-- offset: long (nullable = true)
 |-- seqNumber: long (nullable = true)
 |-- enqueuedTime: long (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+--------------+---------------+---------+------------+
|body  |offset|sequenceNumber|enqueuedTime   |publisher|partitionKey|
+------+------+--------------+---------------+---------+------------+
|[50 75 62 6C 69 63 20 70 72 65 76 69 65 77 20 6F 66 20 4A 61 76 61 20 6F 6E 20 41 70 70 20 53 65 72 76 69 63 65 2C 20 62 75 69 6C 74 2D 69 6E 20 73 75 70 70 6F 72 74 20 66 6F 72 20 54 6F 6D 63 61 74 20 61 6E 64 20 4F 70 65 6E 4A 44 4B 0A 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 37 76 73 37 63 4B 74 76 61 68 20 0A 23 63 6C 6F 75 64 63 6F 6D 70 75 74 69 6E 67 20 23 41 7A 75 72 65]                              |0     |0             |2018-03-09 05:49:08.86 |null     |null        |
|[4D 69 67 72 61 74 65 20 79 6F 75 72 20 64 61 74 61 62 61 73 65 73 20 74 6F 20 61 20 66 75 6C 6C 79 20 6D 61 6E 61 67 65 64 20 73 65 72 76 69 63 65 20 77 69 74 68 20 41 7A 75 72 65 20 53 51 4C 20 44 61 74 61 62 61 73 65 20 4D 61 6E 61 67 65 64 20 49 6E 73 74 61 6E 63 65 20 7C 20 23 41 7A 75 72 65 20 7C 20 23 43 6C 6F 75 64 20 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 73 4A 48 58 4E 34 74 72 44 6B]            |168   |1             |2018-03-09 05:49:24.752|null     |null        |
+------+------+--------------+---------------+---------+------------+

-------------------------------------------
Batch: 1
-------------------------------------------
...
...

由于输出采用二进制模式,因此请使用以下代码片段将其转换为字符串。Because the output is in a binary mode, use the following snippet to convert it into string.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._

    // Event Hub message format is JSON and contains "body" field
    // Body is binary, so we cast it to string to see the actual content of the message
    val messages =
      incomingStream
      .withColumn("Offset", $"offset".cast(LongType))
      .withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
      .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
      .withColumn("Body", $"body".cast(StringType))
      .select("Offset", "Time (readable)", "Timestamp", "Body")

    messages.printSchema

    messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

现在,输出如以下代码片段所示:The output now resembles the following snippet:

root
 |-- Offset: long (nullable = true)
 |-- Time (readable): timestamp (nullable = true)
 |-- Timestamp: long (nullable = true)
 |-- Body: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----------------+----------+-------+
|Offset|Time (readable)  |Timestamp |Body
+------+-----------------+----------+-------+
|0     |2018-03-09 05:49:08.86 |1520574548|Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure          |
|168   |2018-03-09 05:49:24.752|1520574564|Migrate your databases to a fully managed service with Azure SQL Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk    |
|0     |2018-03-09 05:49:02.936|1520574542|@Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence|
|176   |2018-03-09 05:49:20.801|1520574560|4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie                                                    |
+------+-----------------+----------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
...
...

就这么简单!That's it! 现已成功使用 Azure Databricks 以近乎实时的速度将数据流式传输到 Azure 事件中心。Using Azure Databricks, you have successfully streamed data into Azure Event Hubs in near real-time. 接下来,可以通过适用于 Apache Spark 的事件中心连接器来使用流数据。You then consumed the stream data using the Event Hubs connector for Apache Spark. 有关如何使用适用于 Spark 的事件中心连接器的详细信息,请参阅连接器文档For more information on how to use the Event Hubs connector for Spark, see the connector documentation.

清理资源Clean up resources

运行完本教程后,可以终止群集。After you have finished running the tutorial, you can terminate the cluster. 为此,请在 Azure Databricks 工作区的左窗格中选择“群集” 。To do so, from the Azure Databricks workspace, from the left pane, select Clusters . 针对想要终止的群集,将光标移到“操作” 列下面的省略号上,选择“终止” 图标。For the cluster you want to terminate, move the cursor over the ellipsis under Actions column, and select the Terminate icon.

停止 Databricks 群集Stop a Databricks cluster

如果不手动终止群集,但在创建群集时选中了“在不活动 __ 分钟后终止”复选框,则该群集会自动停止。If you do not manually terminate the cluster it will automatically stop, provided you selected the Terminate after __ minutes of inactivity checkbox while creating the cluster. 在这种情况下,如果群集保持非活动状态超过指定的时间,则会自动停止。In such a case, the cluster will automatically stop if it has been inactive for the specified time.

后续步骤Next steps

在本教程中,你了解了如何执行以下操作:In this tutorial, you learned how to:

  • 创建 Azure Databricks 工作区Create an Azure Databricks workspace
  • 在 Azure Databricks 中创建 Spark 群集Create a Spark cluster in Azure Databricks
  • 创建用于生成流数据的 Twitter 应用Create a Twitter app to generate streaming data
  • 在 Azure Databricks 中创建 NotebookCreate notebooks in Azure Databricks
  • 添加事件中心和 Twitter API 的库Add libraries for Event Hubs and Twitter API
  • 将推文发送到事件中心Send tweets to Event Hubs
  • 读取事件中心的推文Read tweets from Event Hubs

继续学习下一教程,了解如何使用 Azure Databricks 和认知服务 API 针对流数据执行情绪分析。Advance to the next tutorial to learn about performing sentiment analysis on the streamed data using Azure Databricks and Cognitive Services API.