使用 .NET Framework 从 Azure 事件中心接收事件

介绍

事件中心是一个服务,可用于处理来自连接设备和应用程序的大量事件数据(遥测)。 将数据采集到事件中心后,可以使用任何实时分析提供程序或存储群集来转换和存储数据。 这种大规模事件收集和处理功能是现代应用程序体系结构(包括物联网 (IoT))的重要组件。 有关事件中心的详细概述,请参阅事件中心概述事件中心功能

本教程演示如何编写 .NET Framework 控制台应用程序,以使用事件处理程序主机从事件中心接收消息。 事件处理程序主机是一个 .NET 类,它通过从事件中心管理持久检查点和并行接收来简化从那些事件中心接收事件的过程。 使用事件处理程序主机,可跨多个接收方拆分事件,即使在不同节点中托管时也是如此。 此示例演示如何为单一接收方使用事件处理程序主机。 扩大事件处理示例显示如何将事件处理程序主机用于多个接收方。

先决条件

若要完成本教程,需要满足以下先决条件:

创建事件中心命名空间和事件中心

第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。 若要创建命名空间和事件中心,请按照本文中的步骤进行操作,然后继续执行本教程的以下步骤。

为事件处理程序主机创建存储帐户

事件处理程序主机是一个智能代理,它通过管理持久性检查点和并行接收操作,来简化从事件中心接收事件的过程。 对于检查点,事件处理程序主机需要一个存储帐户。 以下示例演示如何创建存储帐户,以及如何获取其密钥以进行访问:

  1. 在 Azure 门户中,选择屏幕左上角的“创建资源”。

  2. 选择“存储”,然后选择“存储帐户 - Blob、文件、表、队列”。

    选择存储帐户

  3. 在“创建存储帐户”页中执行以下步骤:

    1. 输入存储帐户的名称。
    2. 选择包含事件中心的 Azure 订阅。
    3. 选择包含事件中心的资源组。
    4. 选择可在其中创建资源的位置。
    5. 然后单击“查看 + 创建”。

      创建存储帐户 - 页面

  4. 在“查看 + 创建”页上查看值,然后选择“创建”。

    查看存储帐户设置,然后执行创建操作

  5. 看到“部署成功”消息后,选择页面顶部的“访问资源”。 还可以通过从资源列表中选择存储帐户来启动“存储帐户”页。

    从部署中选择存储帐户

  6. 在“概要”窗口中选择“Blob”。

    选择 Blob 服务

  7. 选择顶部的“+ 容器”,为容器输入名称,然后选择“确定”。

    创建 Blob 容器

  8. 在左侧菜单中选择”访问密钥”,然后复制 key1 的值。

    将以下值保存到记事本或其他某个临时位置。

    • 存储帐户的名称
    • 存储帐户的访问密钥
    • 容器的名称

创建控制台应用程序

在 Visual Studio 中,使用 控制台应用程序 项目模板创建一个新的 Visual C# 桌面应用项目。 将该项目命名为 Receiver

添加事件中心 NuGet 包

  1. 在解决方案资源管理器中,右键单击“Receiver”项目,并单击“为解决方案管理 NuGet 包”。
  2. 单击“浏览”选项卡,并搜索 Microsoft Azure Service Bus Event Hub - EventProcessorHost。 单击“安装” 并接受使用条款。

    Visual Studio 下载、安装 Azure 服务总线事件中心 - EventProcessorHost NuGet 包及其所有依赖项并添加对它们的引用。

实现 IEventProcessor 接口

  1. 右键单击 Receiver 项目,单击“添加”,并单击“类”。 将新类命名为 SimpleEventProcessor,并单击“添加”以创建该类。

  2. 在 SimpleEventProcessor.cs 文件的顶部添加以下语句:

    using Microsoft.ServiceBus.Messaging;
    using System.Diagnostics;
    
  3. 用以下代码替换该类的正文:

    class SimpleEventProcessor : IEventProcessor
    {
      Stopwatch checkpointStopWatch;
    
      async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
      {
          Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
          if (reason == CloseReason.Shutdown)
          {
              await context.CheckpointAsync();
          }
      }
    
      Task IEventProcessor.OpenAsync(PartitionContext context)
      {
          Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
          this.checkpointStopWatch = new Stopwatch();
          this.checkpointStopWatch.Start();
          return Task.FromResult<object>(null);
      }
    
      async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
      {
          foreach (EventData eventData in messages)
          {
              string data = Encoding.UTF8.GetString(eventData.GetBytes());
    
              Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                  context.Lease.PartitionId, data));
          }
    
          //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
          if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
          {
              await context.CheckpointAsync();
              this.checkpointStopWatch.Restart();
          }
      }
    }
    

    此类由 EventProcessorHost 调用,用于处理从事件中心接收的事件。 SimpleEventProcessor 类使用秒表定期对 EventProcessorHost 上下文调用检查点方法。 此操作确保接收方重启时,其丢失的处理工作不会超过五分钟。

更新 Main 方法以使用 SimpleEventProcessor

  1. Program 类中,在文件顶部添加以下 using 语句:

    using Microsoft.ServiceBus.Messaging;
    
  2. Program 类中的 Main 方法替换为以下代码,从而替换为以前保存的事件中心名称和命名空间级别连接字符串,以及在前面部分复制的存储帐户和密钥。

    static void Main(string[] args)
    {
      string eventHubConnectionString = "{Event Hubs namespace connection string}";
      string eventHubName = "{Event Hub name}";
      string storageAccountName = "{storage account name}";
      string storageAccountKey = "{storage account key}";
     string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1};EndpointSuffix=core.chinacloudapi.cn", storageAccountName, storageAccountKey);
    
      string eventProcessorHostName = Guid.NewGuid().ToString();
      EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
      Console.WriteLine("Registering EventProcessor...");
      var options = new EventProcessorOptions();
      options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
      eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();
    
      Console.WriteLine("Receiving. Press enter key to stop worker.");
      Console.ReadLine();
      eventProcessorHost.UnregisterEventProcessorAsync().Wait();
    }
    
  3. 运行程序,并确保没有任何错误。

祝贺! 现在已使用事件处理程序主机从事件中心接收消息。

Note

本教程使用单个 EventProcessorHost实例。 若要增加吞吐量,建议运行多个 EventProcessorHost 实例,如扩展的事件处理示例中所示。 在这些情况下,为了对接收的事件进行负载均衡,多个实例会自动相互协调。

后续步骤

在本快速入门,你已创建从事件中心接收消息的 .NET Framework 应用程序。 若要了解如何使用 .NET Framework 将事件发送到事件中心,请参阅从事件中心发送事件 - .NET Framework