教程:使用 Azure Databricks 针对流数据执行情绪分析Tutorial: Sentiment analysis on streaming data using Azure Databricks

本教程介绍如何使用 Azure Databricks 以接近实时的速度针对数据流运行情绪分析。In this tutorial, you learn how to run sentiment analysis on a stream of data using Azure Databricks in near real time. 使用 Azure 事件中心设置数据引入系统。You set up data ingestion system using Azure Event Hubs. 使用 Spark 事件中心连接器将事件中心的消息用到 Azure Databricks 中。You consume the messages from Event Hubs into Azure Databricks using the Spark Event Hubs connector. 最后,使用认知服务 API 对流数据运行情绪分析。Finally, you use Cognitive Service APIs to run sentiment analysis on the streamed data.

本教程结束时,你应该已经有了来自 Twitter 的流式处理的推文(其中包含“Azure”一词),并已对推文运行情绪分析。By the end of this tutorial, you would have streamed tweets from Twitter that have the term "Azure" in them and ran sentiment analysis on the tweets.

下图演示了应用程序流:The following illustration shows the application flow:

Azure Databricks 与事件中心和认知服务Azure Databricks with Event Hubs and Cognitive Services

本教程涵盖以下任务:This tutorial covers the following tasks:

  • 创建 Azure Databricks 工作区Create an Azure Databricks workspace
  • 在 Azure Databricks 中创建 Spark 群集Create a Spark cluster in Azure Databricks
  • 创建用于访问流数据的 Twitter 应用Create a Twitter app to access streaming data
  • 在 Azure Databricks 中创建 NotebookCreate notebooks in Azure Databricks
  • 附加事件中心和 Twitter API 的库Attach libraries for Event Hubs and Twitter API
  • 创建认知服务帐户并检索访问密钥Create a Cognitive Services account and retrieve the access key
  • 将推文发送到事件中心Send tweets to Event Hubs
  • 读取事件中心的推文Read tweets from Event Hubs
  • 对推文运行情绪分析Run sentiment analysis on tweets

如果还没有 Azure 订阅,可以在开始前创建一个免费帐户If you don't have an Azure subscription, create a free account before you begin.

备注

不能使用 Azure 免费试用订阅完成本教程 。This tutorial cannot be carried out using Azure Free Trial Subscription . 如果你有免费帐户,请转到个人资料并将订阅更改为“即用即付” 。If you have a free account, go to your profile and change your subscription to pay-as-you-go . 有关详细信息,请参阅 Azure 免费帐户For more information, see Azure free account. 然后,移除支出限制,并为你所在区域的 vCPU 请求增加配额Then, remove the spending limit, and request a quota increase for vCPUs in your region. 创建 Azure Databricks 工作区时,可以选择“试用版(高级 - 14天免费 DBU)” 定价层,让工作区访问免费的高级 Azure Databricks DBU 14 天。When you create your Azure Databricks workspace, you can select the Trial (Premium - 14-Days Free DBUs) pricing tier to give the workspace access to free Premium Azure Databricks DBUs for 14 days.

必备条件Prerequisites

在开始学习本教程之前,请确保满足以下要求:Before you start with this tutorial, make sure to meet the following requirements:

  • Azure 事件中心命名空间。An Azure Event Hubs namespace.
  • 命名空间中的事件中心。An Event Hub within the namespace.
  • 用于访问事件中心命名空间的连接字符串。Connection string to access the Event Hubs namespace. 该连接字符串应采用类似于 Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value> 的格式。The connection string should have a format similar to Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>.
  • 事件中心的共享访问策略名称和策略密钥。Shared access policy name and policy key for Event Hubs.

完成创建 Azure 事件中心命名空间和事件中心一文中的步骤即可满足这些要求。You can meet these requirements by completing the steps in the article, Create an Azure Event Hubs namespace and event hub.

登录到 Azure 门户Sign in to the Azure portal

登录 Azure 门户Sign in to the Azure portal.

创建 Azure Databricks 工作区Create an Azure Databricks workspace

在本部分,使用 Azure 门户创建 Azure Databricks 工作区。In this section, you create an Azure Databricks workspace using the Azure portal.

  1. 在 Azure 门户中,选择“创建资源” > “数据 + 分析” > “Azure Databricks”。In the Azure portal, select Create a resource > Data + Analytics > Azure Databricks .

    Azure 门户上的 DatabricksDatabricks on Azure portal

  2. 在“Azure Databricks 服务” 下,提供所需的值以创建 Databricks 工作区。Under Azure Databricks Service , provide the values to create a Databricks workspace.

    创建 Azure Databricks 工作区Create an Azure Databricks workspace

    提供以下值:Provide the following values:

    propertiesProperty 说明Description
    工作区名称Workspace name 提供 Databricks 工作区的名称Provide a name for your Databricks workspace
    订阅Subscription 从下拉列表中选择自己的 Azure 订阅。From the drop-down, select your Azure subscription.
    资源组Resource group 指定是要创建新的资源组还是使用现有的资源组。Specify whether you want to create a new resource group or use an existing one. 资源组是用于保存 Azure 解决方案相关资源的容器。A resource group is a container that holds related resources for an Azure solution. 有关详细信息,请参阅 Azure 资源组概述For more information, see Azure Resource Group overview.
    位置Location 选择“美国东部 2”。 Select East US 2 . 有关其他可用区域,请参阅各区域推出的 Azure 服务For other available regions, see Azure services available by region.
    定价层Pricing Tier 选择“标准”或“高级”。 Choose between Standard or Premium . 有关这些层的详细信息,请参阅 Databricks 价格页For more information on these tiers, see Databricks pricing page.

    选择“固定到仪表板” ,然后选择“创建” 。Select Pin to dashboard and then select Create .

  3. 创建帐户需要几分钟时间。The account creation takes a few minutes. 在创建帐户过程中,门户会在右侧显示“正在提交 Azure Databricks 的部署” 磁贴。During account creation, the portal displays the Submitting deployment for Azure Databricks tile on the right side. 可能需要在仪表板上向右滚动才能看到此磁贴。You may need to scroll right on your dashboard to see the tile. 另外,还会在屏幕顶部附近显示进度条。There is also a progress bar displayed near the top of the screen. 你可以查看任一区域来了解进度。You can watch either area for progress.

    Databricks 部署磁贴Databricks deployment tile

在 Databricks 中创建 Spark 群集Create a Spark cluster in Databricks

  1. 在 Azure 门户中,转到所创建的 Databricks 工作区,然后选择“启动工作区”。 In the Azure portal, go to the Databricks workspace that you created, and then select Launch Workspace .

  2. 随后将会重定向到 Azure Databricks 门户。You are redirected to the Azure Databricks portal. 在门户中选择“群集”。 From the portal, select Cluster .

    Azure 上的 DatabricksDatabricks on Azure

  3. 在“新建群集”页中,提供用于创建群集的值。 In the New cluster page, provide the values to create a cluster.

    在 Azure 上创建 Databricks Spark 群集Create Databricks Spark cluster on Azure

    除以下值外,接受其他所有默认值:Accept all other default values other than the following:

    • 输入群集的名称。Enter a name for the cluster.
    • 在本文中,请创建运行时为 6.0 的群集。For this article, create a cluster with 6.0 runtime.
    • 请务必选中 在不活动超过 __ 分钟后终止 复选框。Make sure you select the Terminate after __ minutes of inactivity checkbox. 提供一个持续时间(以分钟为单位),如果群集在这段时间内一直未被使用,则会将其终止。Provide a duration (in minutes) to terminate the cluster, if the cluster is not being used.

    选择适合你的技术标准和预算的群集辅助角色和驱动程序节点大小。Select cluster worker and driver node size suitable for your technical criteria and budget.

    选择“创建群集”。 Select Create cluster . 群集运行后,可将笔记本附加到该群集,并运行 Spark 作业。Once the cluster is running, you can attach notebooks to the cluster and run Spark jobs.

创建 Twitter 应用程序Create a Twitter application

若要接收推文流,请在 Twitter 中创建一个应用程序。To receive a stream of tweets, you create an application in Twitter. 遵照说明创建一个 Twitter 应用程序,并记下稍后需要在本教程中使用的值。Follow the instructions create a Twitter application and record the values that you need to complete this tutorial.

  1. 在 Web 浏览器中,转到面向开发人员的 Twitter,然后选择“创建应用” 。From a web browser, go to Twitter For Developers, and select Create an app . 可能会看到一条消息,指出你需要申请 Twitter 开发人员帐户。You might see a message saying that you need to apply for a Twitter developer account. 可以随意执行此操作,在你的申请获得批准后,应该会看到一封确认电子邮件。Feel free to do so, and after your application has been approved you should see a confirmation email. 批准开发人员帐户可能需要几天时间。It could take several days to be approved for a developer account.

    Twitter 开发人员帐户确认Twitter developer account confirmation

  2. 在“创建应用程序” 页中提供新应用的详细信息,然后选择“创建 Twitter 应用程序” 。In the Create an application page, provide the details for the new app, and then select Create your Twitter application .

    Twitter 应用程序详细信息Twitter application details

    Twitter 应用程序详细信息Twitter application details

  3. 在应用程序页中选择“密钥和令牌” 选项卡,复制“使用者 API 密钥” 和“使用者 API 密钥” 的值。In the application page, select the Keys and Tokens tab and copy the values for Consumer API Key and Consumer API Secret Key . 此外,在“访问令牌和访问令牌机密”下选择“创建”以生成访问令牌。Also, select Create under Access Token and Access Token Secret to generate the access tokens. 复制“访问令牌” 和 “访问令牌机密”的值。Copy the values for Access Token and Access Token Secret .

    Twitter 应用程序详细信息Twitter application details

保存 Twitter 应用程序的检索值。Save the values that you retrieved for the Twitter application. 稍后在本教程中需要用到这些值。You need the values later in the tutorial.

将库附加到 Spark 群集Attach libraries to Spark cluster

本教程使用 Twitter API 将推文发送到事件中心。In this tutorial, you use the Twitter APIs to send tweets to Event Hubs. 也可以使用 Apache Spark 事件中心连接器在 Azure 事件中心读取和写入数据。You also use the Apache Spark Event Hubs connector to read and write data into Azure Event Hubs. 若要将这些 API 用作群集的一部分,请将其作为库添加到 Azure Databricks,然后将其与 Spark 群集相关联。To use these APIs as part of your cluster, add them as libraries to Azure Databricks and associate them with your Spark cluster. 以下说明演示如何添加库。The following instructions show how to add a library.

  1. 在 Azure Databricks 工作区中,选择“群集” ,然后选择现有的 Spark 群集。In the Azure Databricks workspace, select Clusters , and choose your existing Spark cluster. 在群集菜单中,选择“库” ,然后单击“新安装” 。Within the cluster menu, choose Libraries and click Install New .

    “添加库”对话框Add library dialog box

    “添加库”对话框Add library dialog box

  2. 在“新建库” 页中,选择“Maven”作为“源” 。In the New Library page, for Source select Maven . 对于“坐标” ,请单击“搜索包” 以查找要添加的包。For Coordinate , click Search Packages for the package you want to add. 下面是本教程中使用的库的 Maven 坐标:Here is the Maven coordinates for the libraries used in this tutorial:

    • Spark 事件中心连接器 - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10Spark Event Hubs connector - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10

    • Twitter API - org.twitter4j:twitter4j-core:4.0.7Twitter API - org.twitter4j:twitter4j-core:4.0.7

      提供 Maven 坐标Provide Maven coordinates

      提供 Maven 坐标Provide Maven coordinates

  3. 选择“安装” 。Select Install .

  4. 在群集菜单中,确保两个库均已正确安装和附加。In the cluster menu, make sure both libraries are installed and attached properly.

    检查库Check libraries

  5. 针对 Twitter 包 twitter4j-core:4.0.7 重复上述步骤。Repeat these steps for the Twitter package, twitter4j-core:4.0.7.

获取认知服务访问密钥Get a Cognitive Services access key

本教程介绍如何使用 Azure 认知服务文本分析 API 以接近实时的速度对推文流运行情绪分析。In this tutorial, you use the Azure Cognitive Services Text Analytics APIs to run sentiment analysis on a stream of tweets in near real time. 在使用这些 API 之前,必须在 Azure 上创建 Azure 认知服务帐户,并检索使用文本分析 API 所需的访问密钥。Before you use the APIs, you must create a Azure Cognitive Services account on Azure and retrieve an access key to use the Text Analytics APIs.

  1. 登录 Azure 门户Sign in to the Azure portal.

  2. 选择“+ 创建资源”。 Select + Create a resource .

  3. 在 Azure 市场下选择“AI + 认知服务” > “文本分析 API”。Under Azure Marketplace, Select AI + Cognitive Services > Text Analytics API .

    创建认知服务帐户Create cognitive services account

  4. 在“创建” 对话框中,提供以下值:In the Create dialog box, provide the following values:

    创建认知服务帐户Create cognitive services account

    • 为认知服务帐户输入一个名称。Enter a name for the Cognitive Services account.

    • 选择在其下创建帐户的 Azure 订阅。Select the Azure subscription under which the account is created.

    • 选择 Azure 位置。Select an Azure location.

    • 选择服务定价层。Select a pricing tier for the service. 有关认知服务定价的详细信息,请参阅定价页For more information about Cognitive Services pricing, see pricing page.

    • 指定是要创建新的资源组还是选择现有的资源组。Specify whether you want to create a new resource group or select an existing one.

      选择“创建” 。Select Create .

  5. 创建帐户后,请从“概述” 选项卡中选择“显示访问密钥” 。After the account is created, from the Overview tab, select Show access keys .

    显示访问密钥Show access keys

    另请复制终结点 URL 的一部分,如屏幕截图所示。Also, copy a part of the endpoint URL, as shown in the screenshot. 本教程会用到此 URL。You need this URL in the tutorial.

  6. 在“管理密钥” 下针对要使用的密钥选择复制图标。Under Manage keys , select the copy icon against the key you want to use.

    复制访问密钥Copy access keys

  7. 保存终结点 URL 和访问密钥的值,这些值在此步骤中已检索。Save the values for the endpoint URL and the access key, you retrieved in this step. 本教程后面部分需要它。You need it later in this tutorial.

在 Databricks 中创建 NotebookCreate notebooks in Databricks

在本部分,请使用以下名称在 Databricks 工作区中创建两个 NotebookIn this section, you create two notebooks in Databricks workspace with the following names

  • SendTweetsToEventHub - 用于从 Twitter 获取推文并将其流式传输到事件中心的生成者 Notebook。SendTweetsToEventHub - A producer notebook you use to get tweets from Twitter and stream them to Event Hubs.
  • AnalyzeTweetsFromEventHub - 用于从事件中心读取推文并运行情绪分析的使用者 Notebook。AnalyzeTweetsFromEventHub - A consumer notebook you use to read the tweets from Event Hubs and run sentiment analysis.
  1. 在左窗格中选择“工作区” 。In the left pane, select Workspace . 在“工作区”下拉列表中选择“创建”,然后选择“Notebook”。 From the Workspace drop-down, select Create , and then select Notebook .

    在 Databricks 中创建笔记本Create notebook in Databricks

  2. 在“创建 Notebook” 对话框中输入 SendTweetsToEventHub ,选择 Scala 作为语言,并选择前面创建的 Spark 群集。In the Create Notebook dialog box, enter SendTweetsToEventHub , select Scala as the language, and select the Spark cluster that you created earlier.

    在 Databricks 中创建笔记本Create notebook in Databricks

    选择“创建” 。Select Create .

  3. 重复上述步骤,创建 AnalyzeTweetsFromEventHub Notebook。Repeat the steps to create the AnalyzeTweetsFromEventHub notebook.

将推文发送到事件中心Send tweets to Event Hubs

SendTweetsToEventHub 笔记本中粘贴以下代码,并将占位符替换为前面创建的事件中心命名空间和 Twitter 应用程序的值。In the SendTweetsToEventHub notebook, paste the following code, and replace the placeholders with values for your Event Hubs namespace and Twitter application that you created earlier. 此 Notebook 会将包含关键字“Azure”的推文实时流式传输到事件中心。This notebook streams tweets with the keyword "Azure" into Event Hubs in real time.

备注

Twitter API 有一定的请求限制和配额Twitter API has certain request restrictions and quotas. 如果你对 Twitter API 中的标准速率限制不满意,可以在本例中不使用 Twitter API 生成文本内容。If you are not satisfied with standard rate limiting in Twitter API, you can generate text content without using Twitter API in this example. 为此,请将变量 dataSource 设置为 test 而不是 twitter,并使用首选测试输入填充列表 testSourceTo do that, set variable dataSource to test instead of twitter and populate the list testSource with preferred test input.

    import scala.collection.JavaConverters._
    import com.microsoft.azure.eventhubs._
    import java.util.concurrent._
    import scala.collection.immutable._
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global

    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val pool = Executors.newScheduledThreadPool(1)
    val eventHubClient = EventHubClient.create(connStr.toString(), pool)

    def sleep(time: Long): Unit = Thread.sleep(time)

    def sendEvent(message: String, delay: Long) = {
      sleep(delay)
      val messageData = EventData.create(message.getBytes("UTF-8"))
      eventHubClient.get().send(messageData)
      System.out.println("Sent event: " + message + "\n")
    }

    // Add your own values to the list
    val testSource = List("Azure is the greatest!", "Azure isn't working :(", "Azure is okay.")

    // Specify 'test' if you prefer to not use Twitter API and loop through a list of values you define in `testSource`
    // Otherwise specify 'twitter'
    val dataSource = "test"

    if (dataSource == "twitter") {

      import twitter4j._
      import twitter4j.TwitterFactory
      import twitter4j.Twitter
      import twitter4j.conf.ConfigurationBuilder

      // Twitter configuration!
      // Replace values below with you

      val twitterConsumerKey = "<CONSUMER API KEY>"
      val twitterConsumerSecret = "<CONSUMER API SECRET>"
      val twitterOauthAccessToken = "<ACCESS TOKEN>"
      val twitterOauthTokenSecret = "<TOKEN SECRET>"

      val cb = new ConfigurationBuilder()
        cb.setDebugEnabled(true)
        .setOAuthConsumerKey(twitterConsumerKey)
        .setOAuthConsumerSecret(twitterConsumerSecret)
        .setOAuthAccessToken(twitterOauthAccessToken)
        .setOAuthAccessTokenSecret(twitterOauthTokenSecret)

      val twitterFactory = new TwitterFactory(cb.build())
      val twitter = twitterFactory.getInstance()

      // Getting tweets with keyword "Azure" and sending them to the Event Hub in realtime!
      val query = new Query(" #Azure ")
      query.setCount(100)
      query.lang("en")
      var finished = false
      while (!finished) {
        val result = twitter.search(query)
        val statuses = result.getTweets()
        var lowestStatusId = Long.MaxValue
        for (status <- statuses.asScala) {
          if(!status.isRetweet()){
            sendEvent(status.getText(), 5000)
          }
          lowestStatusId = Math.min(status.getId(), lowestStatusId)
        }
        query.setMaxId(lowestStatusId - 1)
      }

    } else if (dataSource == "test") {
      // Loop through the list of test input data
      while (true) {
        testSource.foreach {
          sendEvent(_,5000)
        }
      }

    } else {
      System.out.println("Unsupported Data Source. Set 'dataSource' to \"twitter\" or \"test\"")
    }

    // Closing connection to the Event Hub
    eventHubClient.get().close()

若要运行该 Notebook,请按 SHIFT + ENTERTo run the notebook, press SHIFT + ENTER . 随后会显示类似于以下片段的输出:You see an output like the snippet below. 输出中的每个事件是包含“Azure”一词的、已引入事件中心的推文。Each event in the output is a tweet that is ingested into the Event Hubs containing the term "Azure".

Sent event: @Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence

Sent event: Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure

Sent event: 4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie

Sent event: Migrate your databases to a fully managed service with Azure SQL Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk

Sent event: Top 10 Tricks to #Save Money with #Azure Virtual Machines https://t.co/F2wshBXdoz #Cloud

...
...

读取事件中心的推文Read tweets from Event Hubs

AnalyzeTweetsFromEventHub Notebook 中粘贴以下代码,并将占位符替换为前面创建的 Azure 事件中心的值。In the AnalyzeTweetsFromEventHub notebook, paste the following code, and replace the placeholder with values for your Azure Event Hubs that you created earlier. 此 Notebook 读取前面使用 SendTweetsToEventHub Notebook 流式传输到事件中心的推文。This notebook reads the tweets that you earlier streamed into Event Hubs using the SendTweetsToEventHub notebook.


    import org.apache.spark.eventhubs._
    import com.microsoft.azure.eventhubs._

    // Build connection string with the above information
    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val customEventhubParameters =
      EventHubsConf(connStr.toString())
      .setMaxEventsPerTrigger(5)

    val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

    incomingStream.printSchema

    // Sending the incoming stream into the console.
    // Data comes in batches!
    incomingStream.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

将返回以下输出:You get the following output:

root
 |-- body: binary (nullable = true)
 |-- offset: long (nullable = true)
 |-- seqNumber: long (nullable = true)
 |-- enqueuedTime: long (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+--------------+---------------+---------+------------+
|body  |offset|sequenceNumber|enqueuedTime   |publisher|partitionKey|
+------+------+--------------+---------------+---------+------------+
|[50 75 62 6C 69 63 20 70 72 65 76 69 65 77 20 6F 66 20 4A 61 76 61 20 6F 6E 20 41 70 70 20 53 65 72 76 69 63 65 2C 20 62 75 69 6C 74 2D 69 6E 20 73 75 70 70 6F 72 74 20 66 6F 72 20 54 6F 6D 63 61 74 20 61 6E 64 20 4F 70 65 6E 4A 44 4B 0A 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 37 76 73 37 63 4B 74 76 61 68 20 0A 23 63 6C 6F 75 64 63 6F 6D 70 75 74 69 6E 67 20 23 41 7A 75 72 65]                              |0     |0             |2018-03-09 05:49:08.86 |null     |null        |
|[4D 69 67 72 61 74 65 20 79 6F 75 72 20 64 61 74 61 62 61 73 65 73 20 74 6F 20 61 20 66 75 6C 6C 79 20 6D 61 6E 61 67 65 64 20 73 65 72 76 69 63 65 20 77 69 74 68 20 41 7A 75 72 65 20 53 51 4C 20 44 61 74 61 62 61 73 65 20 4D 61 6E 61 67 65 64 20 49 6E 73 74 61 6E 63 65 20 7C 20 23 41 7A 75 72 65 20 7C 20 23 43 6C 6F 75 64 20 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 73 4A 48 58 4E 34 74 72 44 6B]            |168   |1             |2018-03-09 05:49:24.752|null     |null        |
+------+------+--------------+---------------+---------+------------+

-------------------------------------------
Batch: 1
-------------------------------------------
...
...

由于输出采用二进制模式,因此请使用以下代码片段将其转换为字符串。Because the output is in a binary mode, use the following snippet to convert it into string.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._

    // Event Hub message format is JSON and contains "body" field
    // Body is binary, so we cast it to string to see the actual content of the message
    val messages =
      incomingStream
      .withColumn("Offset", $"offset".cast(LongType))
      .withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
      .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
      .withColumn("Body", $"body".cast(StringType))
      .select("Offset", "Time (readable)", "Timestamp", "Body")

    messages.printSchema

    messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

现在,输出如以下代码片段所示:The output now resembles the following snippet:

root
 |-- Offset: long (nullable = true)
 |-- Time (readable): timestamp (nullable = true)
 |-- Timestamp: long (nullable = true)
 |-- Body: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----------------+----------+-------+
|Offset|Time (readable)  |Timestamp |Body
+------+-----------------+----------+-------+
|0     |2018-03-09 05:49:08.86 |1520574548|Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure          |
|168   |2018-03-09 05:49:24.752|1520574564|Migrate your databases to a fully managed service with SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk    |
|0     |2018-03-09 05:49:02.936|1520574542|@Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence|
|176   |2018-03-09 05:49:20.801|1520574560|4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie                                                    |
+------+-----------------+----------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
...
...

现在已通过适用于 Apache Spark 的事件中心连接器接近实时地将数据从 Azure 事件中心流式传输到 Azure Databricks 中。You have now streamed data from Azure Event Hubs into Azure Databricks at near real time using the Event Hubs connector for Apache Spark. 有关如何使用适用于 Spark 的事件中心连接器的详细信息,请参阅连接器文档For more information on how to use the Event Hubs connector for Spark, see the connector documentation.

对推文运行情绪分析Run sentiment analysis on tweets

在此部分,请对使用 Twitter API 接收到的推文运行情绪分析。In this section, you run sentiment analysis on the tweets received using the Twitter API. 对于此部分,请将代码片段添加到同一 AnalyzeTweetsFromEventHub Notebook。For this section, you add the code snippets to the same AnalyzeTweetsFromEventHub notebook.

一开始请将新的代码单元格添加到 Notebook 中,然后粘贴下面提供的代码片段。Start by adding a new code cell in the notebook and paste the code snippet provided below. 此代码片段定义的数据类型适用于语言和情绪 API。This code snippet defines data types for working with the Language and Sentiment API.

import java.io._
import java.net._
import java.util._

case class Language(documents: Array[LanguageDocuments], errors: Array[Any]) extends Serializable
case class LanguageDocuments(id: String, detectedLanguages: Array[DetectedLanguages]) extends Serializable
case class DetectedLanguages(name: String, iso6391Name: String, score: Double) extends Serializable

case class Sentiment(documents: Array[SentimentDocuments], errors: Array[Any]) extends Serializable
case class SentimentDocuments(id: String, score: Double) extends Serializable

case class RequestToTextApi(documents: Array[RequestToTextApiDocument]) extends Serializable
case class RequestToTextApiDocument(id: String, text: String, var language: String = "") extends Serializable

添加新的代码单元格,然后粘贴下面提供的代码片段。Add a new code cell and paste the snippet provided below. 此代码片段定义的对象包含各种函数,用于调用文本分析 API,以便运行语言检测和情绪分析。This snippet defines an object that contains functions to call the Text Analysis API to run language detection and sentiment analysis. 确保将占位符 <PROVIDE ACCESS KEY HERE> 替换为针对认知服务帐户检索到的值。Make sure you replace the placeholder <PROVIDE ACCESS KEY HERE> with the value you retrieved for your Cognitive Services account.

import javax.net.ssl.HttpsURLConnection
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import scala.util.parsing.json._

object SentimentDetector extends Serializable {

    // Cognitive Services API connection settings
    val accessKey = "<PROVIDE ACCESS KEY HERE>"
    val host = "https://cognitive-docs.cognitiveservices.azure.com/"
    val languagesPath = "/text/analytics/v2.1/languages"
    val sentimentPath = "/text/analytics/v2.1/sentiment"
    val languagesUrl = new URL(host+languagesPath)
    val sentimenUrl = new URL(host+sentimentPath)
    val g = new Gson

    def getConnection(path: URL): HttpsURLConnection = {
        val connection = path.openConnection().asInstanceOf[HttpsURLConnection]
        connection.setRequestMethod("POST")
        connection.setRequestProperty("Content-Type", "text/json")
        connection.setRequestProperty("Ocp-Apim-Subscription-Key", accessKey)
        connection.setDoOutput(true)
        return connection
    }

    def prettify (json_text: String): String = {
        val parser = new JsonParser()
        val json = parser.parse(json_text).getAsJsonObject()
        val gson = new GsonBuilder().setPrettyPrinting().create()
        return gson.toJson(json)
    }

    // Handles the call to Cognitive Services API.
    def processUsingApi(request: RequestToTextApi, path: URL): String = {
        val requestToJson = g.toJson(request)
        val encoded_text = requestToJson.getBytes("UTF-8")
        val connection = getConnection(path)
        val wr = new DataOutputStream(connection.getOutputStream())
        wr.write(encoded_text, 0, encoded_text.length)
        wr.flush()
        wr.close()

        val response = new StringBuilder()
        val in = new BufferedReader(new InputStreamReader(connection.getInputStream()))
        var line = in.readLine()
        while (line != null) {
            response.append(line)
            line = in.readLine()
        }
        in.close()
        return response.toString()
    }

    // Calls the language API for specified documents.
    def getLanguage (inputDocs: RequestToTextApi): Option[Language] = {
        try {
            val response = processUsingApi(inputDocs, languagesUrl)
            // In case we need to log the json response somewhere
            val niceResponse = prettify(response)
            // Deserializing the JSON response from the API into Scala types
            val language = g.fromJson(niceResponse, classOf[Language])
            if (language.documents(0).detectedLanguages(0).iso6391Name == "(Unknown)")
                return None
            return Some(language)
        } catch {
            case e: Exception => return None
        }
    }

    // Calls the sentiment API for specified documents. Needs a language field to be set for each of them.
    def getSentiment (inputDocs: RequestToTextApi): Option[Sentiment] = {
        try {
            val response = processUsingApi(inputDocs, sentimenUrl)
            val niceResponse = prettify(response)
            // Deserializing the JSON response from the API into Scala types
            val sentiment = g.fromJson(niceResponse, classOf[Sentiment])
            return Some(sentiment)
        } catch {
            case e: Exception => return None
        }
    }
}

添加另一个单元格,定义用于确定情绪的 Spark UDF(用户定义函数)。Add another cell to define a Spark UDF (User-defined function) that determines sentiment.

// User Defined Function for processing content of messages to return their sentiment.
val toSentiment =
    udf((textContent: String) =>
        {
            val inputObject = new RequestToTextApi(Array(new RequestToTextApiDocument(textContent, textContent)))
            val detectedLanguage = SentimentDetector.getLanguage(inputObject)
            detectedLanguage match {
                case Some(language) =>
                    if(language.documents.size > 0) {
                        inputObject.documents(0).language = language.documents(0).detectedLanguages(0).iso6391Name
                        val sentimentDetected = SentimentDetector.getSentiment(inputObject)
                        sentimentDetected match {
                            case Some(sentiment) => {
                                if(sentiment.documents.size > 0) {
                                    sentiment.documents(0).score.toString()
                                }
                                else {
                                    "Error happened when getting sentiment: " + sentiment.errors(0).toString
                                }
                            }
                            case None => "Couldn't detect sentiment"
                        }
                    }
                    else {
                        "Error happened when getting language" + language.errors(0).toString
                    }
                case None => "Couldn't detect language"
            }
        }
    )

添加一个最终代码单元格,以便使用推文的内容以及与推文相关联的情绪来准备数据帧。Add a final code cell to prepare a dataframe with the content of the tweet and the sentiment associated with the tweet.

// Prepare a dataframe with Content and Sentiment columns
val streamingDataFrame = incomingStream.selectExpr("cast (body as string) AS Content").withColumn("Sentiment", toSentiment($"Content"))

// Display the streaming data with the sentiment
streamingDataFrame.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

应该看到输出如以下代码片段所示:You should see an output like the following snippet:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------------------+------------------+
|Content                         |Sentiment         |
+--------------------------------+------------------+
|Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah   #cloudcomputing #Azure          |0.7761918306350708|
|Migrate your databases to a fully managed service with Azure SQL Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk    |0.8558163642883301|
|@Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence|0.5               |
|4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie                                                    |0.5               |
+--------------------------------+------------------+

在“情绪”列中,值接近于 1 表明 Azure 体验很好。A value closer to 1 in the Sentiment column suggests a great experience with Azure. 值接近于 0 表明用户在使用 Microsoft Azure 时遇到问题。A value closer to 0 suggests issues that users faced while working with Microsoft Azure.

就这么简单!That's it! 现已成功使用 Azure Databricks 以接近实时的速度将数据流式传输到 Azure 事件中心,通过事件中心连接器使用了流数据,然后对流数据运行了情绪分析。Using Azure Databricks, you have successfully streamed data into Azure Event Hubs, consumed the stream data using the Event Hubs connector, and then ran sentiment analysis on streaming data in near real time.

清理资源Clean up resources

运行完本教程后,可以终止群集。After you have finished running the tutorial, you can terminate the cluster. 为此,请在 Azure Databricks 工作区的左窗格中选择“群集” 。To do so, from the Azure Databricks workspace, from the left pane, select Clusters . 针对想要终止的群集,将光标移到“操作” 列下面的省略号上,选择“终止” 图标。For the cluster you want to terminate, move the cursor over the ellipsis under Actions column, and select the Terminate icon.

停止 Databricks 群集Stop a Databricks cluster

如果不手动终止群集,但在创建群集时选中了“在不活动 __ 分钟后终止”复选框,则该群集会自动停止。If you do not manually terminate the cluster it will automatically stop, provided you selected the Terminate after __ minutes of inactivity checkbox while creating the cluster. 在这种情况下,如果群集保持非活动状态超过指定的时间,则会自动停止。In such a case, the cluster will automatically stop if it has been inactive for the specified time.

后续步骤Next steps

本教程介绍了如何使用 Azure Databricks 将数据流式传输到 Azure 事件中心,然后从事件中心实时读取流数据。In this tutorial, you learned how to use Azure Databricks to stream data into Azure Event Hubs and then read the streaming data from Event Hubs in real time. 你已了解如何执行以下操作:You learned how to:

  • 创建 Azure Databricks 工作区Create an Azure Databricks workspace
  • 在 Azure Databricks 中创建 Spark 群集Create a Spark cluster in Azure Databricks
  • 创建用于访问流数据的 Twitter 应用Create a Twitter app to access streaming data
  • 在 Azure Databricks 中创建 NotebookCreate notebooks in Azure Databricks
  • 添加和附加事件中心和 Twitter API 的库Add and attach libraries for Event Hubs and Twitter API
  • 创建 Microsoft 认知服务帐户并检索访问密钥Create a Microsoft Cognitive Services account and retrieve the access key
  • 将推文发送到事件中心Send tweets to Event Hubs
  • 读取事件中心的推文Read tweets from Event Hubs
  • 对推文运行情绪分析Run sentiment analysis on tweets

请继续学习下一教程,了解如何使用 Azure Databricks 执行机器学习任务。Advance to the next tutorial to learn about performing machine learning tasks using Azure Databricks.