使用 Apache Storm on HDInsight 从 Azure 事件中心处理事件 (C#)Process events from Azure Event Hubs with Apache Storm on HDInsight (C#)

了解如何从 Apache Storm on HDInsight 使用 Azure 事件中心。Learn how to work with Azure Event Hubs from Apache Storm on HDInsight. 本文档使用 C# Storm 拓扑对事件中心读取和写入数据This document uses a C# Storm topology to read and write data from Event Hubs

SCP.NETSCP.NET

本文档中的步骤使用 SCP.NET,后者是一个 NuGet 包,方便用户创建适用于 Storm on HDInsight 的 C# 拓扑和组件。The steps in this document use SCP.NET, a NuGet package that makes it easy to create C# topologies and components for use with Storm on HDInsight.

Important

虽然本文档中的步骤依赖于带 Visual Studio 的 Windows 开发环境,但是也可将编译的项目提交到使用 Linux 的 Storm on HDInsight 群集。While the steps in this document rely on a Windows development environment with Visual Studio, the compiled project can be submitted to a Storm on HDInsight cluster that uses Linux. 仅在 2016 年 10 月 28 日以后创建的基于 Linux 的群集支持 SCP.NET 拓扑。Only Linux-based clusters created after October 28, 2016, support SCP.NET topologies.

HDInsight 3.4 及更高版本使用 Mono 运行 C# 拓扑。HDInsight 3.4 and greater use Mono to run C# topologies. 本文档中使用的示例适用于 HDInsight 3.6。The example used in this document works with HDInsight 3.6. 如果你计划为 HDInsight 创建自己的 .NET 解决方案,请查看 Mono 兼容性文档了解可能的不兼容性。If you plan on creating your own .NET solutions for HDInsight, check the Mono compatibility document for potential incompatibilities.

群集版本控制Cluster versioning

项目所使用的 Microsoft.SCP.Net.SDK NuGet 包必须与安装在 HDInsight 上的 Storm 的主要版本匹配。The Microsoft.SCP.Net.SDK NuGet package you use for your project must match the major version of Storm installed on HDInsight. HDInsight 版本 3.5 和 3.6 使用 Storm 1.x,因此必须对这些群集使用 SCP.NET 版本 1.0.x.x。HDInsight versions 3.5 and 3.6 use Storm 1.x, so you must use SCP.NET version 1.0.x.x with these clusters.

Important

本文档中的示例需要 HDInsight 3.5 或 3.6 群集。The example in this document expects an HDInsight 3.5 or 3.6 cluster.

Linux 是 HDInsight 3.4 或更高版本上使用的唯一操作系统。Linux is the only operating system used on HDInsight version 3.4 or greater. 有关详细信息,请参阅 HDInsight 在 Windows 上停用For more information, see HDInsight retirement on Windows.

C# 拓扑还必须针对 .NET 4.5 运行。C# topologies must also target .NET 4.5.

如何使用事件中心How to work with Event Hubs

Microsoft 提供一组可用来与 Storm 拓扑中的事件中心通信的 Java 组件。Microsoft provides a set of Java components that can be used to communicate with Event Hubs from a Storm topology. 可以在 https://github.com/hdinsight/mvn-repo/raw/master/org/apache/storm/storm-eventhubs/1.1.0.1/storm-eventhubs-1.1.0.1.jar 上找到包含这些组件的 HDInsight 3.6 兼容版本的 Java 存档 (JAR) 文件。You can find the Java archive (JAR) file that contains an HDInsight 3.6 compatible version of these components at https://github.com/hdinsight/mvn-repo/raw/master/org/apache/storm/storm-eventhubs/1.1.0.1/storm-eventhubs-1.1.0.1.jar.

Important

虽然组件是以 Java 编写的,但可通过 C# 拓扑轻松使用它们。While the components are written in Java, you can easily use them from a C# topology.

本示例使用以下组件:The following components are used in this example:

  • EventHubSpout:从事件中心读取数据。EventHubSpout: Reads data from Event Hubs.
  • EventHubBolt:将数据写入事件中心。EventHubBolt: Writes data to Event Hubs.
  • EventHubSpoutConfig:用于配置 EventHubSpout。EventHubSpoutConfig: Used to configure EventHubSpout.
  • EventHubBoltConfig:用于配置 EventHubBolt。EventHubBoltConfig: Used to configure EventHubBolt.

Spout 用法示例Example spout usage

SCP.NET 提供了用于将 EventHubSpout 添加到拓扑的方法。SCP.NET provides methods for adding an EventHubSpout to your topology. 与使用泛型方法添加 Java 组件相比,这些方法可以更轻松地添加 Spout。These methods make it easier to add a spout than using the generic methods for adding a Java component. 以下示例演示了如何使用 SCP.NET 所提供的 SetEventHubSpoutEventHubSpoutConfig 方法创建 Spout:The following example demonstrates how to create a spout by using the SetEventHubSpout and EventHubSpoutConfig methods provided by SCP.NET:

topologyBuilder.SetEventHubSpout(
    "EventHubSpout",
    new EventHubSpoutConfig(
        ConfigurationManager.AppSettings["EventHubSharedAccessKeyName"],
        ConfigurationManager.AppSettings["EventHubSharedAccessKey"],
        ConfigurationManager.AppSettings["EventHubNamespace"],
        ConfigurationManager.AppSettings["EventHubEntityPath"],
        eventHubPartitions),
    eventHubPartitions);

上面的示例创建了名为 EventHubSpout 的全新 Spout 组件,并将其配置为与事件中心通信。The previous example creates a new spout component named EventHubSpout, and configures it to communicate with an event hub. 组件的并行度提示设置为事件中心的分区数。The parallelism hint for the component is set to the number of partitions in the event hub. 此设置允许 Storm 为每个分区创建一个组件实例。This setting allows Storm to create an instance of the component for each partition.

Bolt 用法示例Example bolt usage

使用 JavaComponmentConstructor 方法创建 Bolt 的实例。Use the JavaComponmentConstructor method to create an instance of the bolt. 以下示例演示如何创建和配置 EventHubBolt 的新实例:The following example demonstrates how to create and configure a new instance of the EventHubBolt:

// Java construcvtor for the Event Hub Bolt
JavaComponentConstructor constructor = JavaComponentConstructor.CreateFromClojureExpr(
    String.Format(@"(org.apache.storm.eventhubs.bolt.EventHubBolt. (org.apache.storm.eventhubs.bolt.EventHubBoltConfig. " +
        @"""{0}"" ""{1}"" ""{2}"" ""{3}"" ""{4}"" {5}))",
        ConfigurationManager.AppSettings["EventHubPolicyName"],
        ConfigurationManager.AppSettings["EventHubPolicyKey"],
        ConfigurationManager.AppSettings["EventHubNamespace"],
        "servicebus.chinacloudapi.cn",
        ConfigurationManager.AppSettings["EventHubName"],
        "true"));

// Set the bolt to subscribe to data from the spout
topologyBuilder.SetJavaBolt(
    "eventhubbolt",
    constructor,
    partitionCount)
        .shuffleGrouping("Spout");

Note

本示例使用以字符串形式传递的 Clojure 表达式,而不是像 Spout 示例那样使用 JavaComponentConstructor 创建 EventHubBoltConfigThis example uses a Clojure expression passed as a string, instead of using JavaComponentConstructor to create an EventHubBoltConfig, as the spout example did. 上述任一方法均有效。Either method works. 使用最适合方法。Use the method that feels best to you.

下载已完成的项目Download the completed project

可以从 GitHub 下载本教程中创建的项目的完整版本。You can download a complete version of the project created in this tutorial from GitHub. 但是,仍然需要按照本教程中的步骤提供配置设置。However, you still need to provide configuration settings by following the steps in this tutorial.

先决条件Prerequisites

下载事件中心组件Download the Event Hubs components

https://github.com/hdinsight/mvn-repo/raw/master/org/apache/storm/storm-eventhubs/1.1.0.1/storm-eventhubs-1.1.0.1.jar 下载事件中心 spout 和 bolt 组件。Download the Event Hubs spout and bolt component from https://github.com/hdinsight/mvn-repo/raw/master/org/apache/storm/storm-eventhubs/1.1.0.1/storm-eventhubs-1.1.0.1.jar.

创建一个名为 eventhubspout 的目录,并将文件保存到该目录中。Create a directory named eventhubspout, and save the file into the directory.

配置事件中心Configure Event Hubs

事件中心是此示例的数据源。Event Hubs is the data source for this example. 使用事件中心入门的“创建事件中心”部分中的信息。Use the information in the "Create an event hub" section of Get started with Event Hubs.

  1. 创建事件中心后,在 Azure 门户中查看“事件中心”设置,选择“共享访问策略”。After the event hub has been created, view the EventHub settings in the Azure portal, and select Shared access policies. 选择“+ 添加”添加以下策略: Select + Add to add the following policies:

    NameName 权限Permissions
    writerwriter 发送Send
    readerreader 侦听Listen

    共享访问策略窗口的屏幕截图

  2. 选择“读取者”和“写入者”策略。Select the reader and writer policies. 复制并保存两个策略的主密钥值,因为稍后将使用这些值。Copy and save the primary key value for both policies, as these values are used later.

配置 EventHubWriterConfigure the EventHubWriter

  1. 如果尚未安装最新版本的用于 Visual Studio 的 HDInsight 工具,请参阅开始使用用于 Visual Studio 的 HDInsight 工具If you have not already installed the latest version of the HDInsight tools for Visual Studio, see Get started using HDInsight tools for Visual Studio.

  2. eventhub-storm-hybrid下载解决方案。Download the solution from eventhub-storm-hybrid.

  3. EventHubWriter 项目中,打开 App.config 文件。In the EventHubWriter project, open the App.config file. 使用此前配置的事件中心提供的信息,填充以下项的值:Use the information from the event hub that you configured earlier to fill in the value for the following keys:

    Key Value
    EventHubPolicyNameEventHubPolicyName 写入者(如果对具有“发送”权限的策略使用不同名称,则改用它。)writer (If you used a different name for the policy with Send permission, use it instead.)
    EventHubPolicyKeyEventHubPolicyKey 写入者策略的键。The key for the writer policy.
    EventHubNamespaceEventHubNamespace 包含事件中心的命名空间。The namespace that contains your event hub.
    EventHubNameEventHubName 事件中心名称。Your event hub name.
    EventHubPartitionCountEventHubPartitionCount 事件中心的分区数。The number of partitions in your event hub.
  4. 保存并关闭 App.config 文件。Save and close the App.config file.

配置 EventHubReaderConfigure the EventHubReader

  1. 打开 EventHubReader 项目。Open the EventHubReader project.

  2. 打开 EventHubReaderApp.config 文件。Open the App.config file for the EventHubReader. 使用前面在事件中心配置的信息填写以下键的值:Use the information from the event hub that you configured earlier to fill in the value for the following keys:

    Key Value
    EventHubPolicyNameEventHubPolicyName 读取者(如果对具有“侦听”权限的策略使用不同名称,则改用它。)reader (If you used a different name for the policy with listen permission, use it instead.)
    EventHubPolicyKeyEventHubPolicyKey 读取者策略的键。The key for the reader policy.
    EventHubNamespaceEventHubNamespace 包含事件中心的命名空间。The namespace that contains your event hub.
    EventHubNameEventHubName 事件中心名称。Your event hub name.
    EventHubPartitionCountEventHubPartitionCount 事件中心的分区数。The number of partitions in your event hub.
  3. 保存并关闭 App.config 文件。Save and close the App.config file.

部署拓扑Deploy the topologies

  1. 在“解决方案资源管理器”中,右键单击 EventHubReader 项目,然后选择“提交到 Storm on HDInsight”。From Solution Explorer, right-click the EventHubReader project, and select Submit to Storm on HDInsight.

    解决方案资源管理器的屏幕截图,其中突出显示了“提交到 Storm on HDInsight”

  2. 在“提交拓扑”对话框中,选择“Storm 群集”。On the Submit Topology dialog box, select your Storm Cluster. 展开“其他配置”,选择“Java 文件路径”,选择“...”,然后选择前面下载的 JAR 文件所在的目录。Expand Additional Configurations, select Java File Paths, select ..., and select the directory that contains the JAR file that you downloaded earlier. 最后,单击“提交”。Finally, click Submit.

    “提交拓扑”对话框的屏幕截图

  3. 提交拓扑之后,将会出现“Storm 拓扑查看器”。When the topology has been submitted, the Storm Topologies Viewer appears. EventHubReader 拓扑。To view information about the topology, select the EventHubReader topology in the left pane.

    “Storm 拓扑查看器”的屏幕截图

  4. 在“解决方案资源管理器”中,右键单击 EventHubWriter 项目,然后选择“提交到 Storm on HDInsight”。From Solution Explorer, right-click the EventHubWriter project, and select Submit to Storm on HDInsight.

  5. 在“提交拓扑”对话框中,选择“Storm 群集”。On the Submit Topology dialog box, select your Storm Cluster. 展开“其他配置”,选择“Java 文件路径”,选择“...”,然后选择前面下载的 JAR 文件所在的目录。Expand Additional Configurations, select Java File Paths, select ..., and select the directory that contains the JAR file you downloaded earlier. 最后,单击“提交”。Finally, click Submit.

  6. 提交拓扑之后,在“Storm 拓扑查看器”中刷新拓扑列表以验证这两个拓扑是否正在群集上运行。When the topology has been submitted, refresh the topology list in the Storm Topologies Viewer to verify that both topologies are running on the cluster.

  7. 在“Storm 拓扑查看器”中,选择 EventHubReader 拓扑。In Storm Topologies Viewer, select the EventHubReader topology.

  8. 若要打开 Bolt 的组件摘要,请双击图表中的“LogBolt”组件。To open the component summary for the bolt, double-click the LogBolt component in the diagram.

  9. 在“执行器”部分,选择“端口”列中的链接之一。In the Executors section, select one of the links in the Port column. 这将显示由组件记录的信息。This displays information logged by the component. 记录的信息类似于以下文本:The logged information is similar to the following text:

     2017-03-02 14:51:29.255 m.s.p.TaskHost [INFO] Received C# STDOUT: 2017-03-02 14:51:29,255 [1] INFO  EventHubReader_LogBolt [(null)] - Received data: {"deviceValue":1830978598,"deviceId":"8566ccbc-034d-45db-883d-d8a31f34068e"}
     2017-03-02 14:51:29.283 m.s.p.TaskHost [INFO] Received C# STDOUT: 2017-03-02 14:51:29,283 [1] INFO  EventHubReader_LogBolt [(null)] - Received data: {"deviceValue":1756413275,"deviceId":"647a5eff-823d-482f-a8b4-b95b35ae570b"}
     2017-03-02 14:51:29.313 m.s.p.TaskHost [INFO] Received C# STDOUT: 2017-03-02 14:51:29,312 [1] INFO  EventHubReader_LogBolt [(null)] - Received data: {"deviceValue":1108478910,"deviceId":"206a68fa-8264-4d61-9100-bfdb68ee8f0a"}
    

停止拓扑Stop the topologies

若要停止拓扑,请在“Storm 拓扑查看器”中选择每个拓扑,然后单击“终止”。To stop the topologies, select each topology in the Storm Topology Viewer, then click Kill.

“Storm 拓扑查看器”的屏幕截图,其中突出显示了“终止”按钮

删除群集Delete your cluster

Warning

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。Billing for HDInsight clusters is prorated per minute, whether you use them or not. 请务必在使用完群集之后将其删除。Be sure to delete your cluster after you finish using it. 请参阅如何删除 HDInsight 群集See how to delete an HDInsight cluster.

后续步骤Next steps

本文档已介绍如何使用 C# 拓扑中的 Java 事件中心 Spout 和 Bolt 处理 Azure 事件中心内的数据。In this document, you have learned how to use the Java Event Hubs spout and bolt from a C# topology to work with data in Azure Event Hubs. 若要了解有关创建 C# 拓扑的详细信息,请参阅以下主题:To learn more about creating C# topologies, see the following:

[Example topologies for Apache Storm on HDInsight](apache-storm-example-topology.md)