使用 .NET Standard 中的事件处理程序主机接收消息入门

事件中心是一个服务,可用于处理来自连接设备和应用程序的大量事件数据(遥测)。 将数据采集到事件中心后,可以使用任何实时分析提供程序或存储群集来转换和存储数据。 这种大规模事件收集和处理功能是现代应用程序体系结构(包括物联网 (IoT))的重要组件。 有关事件中心的详细概述,请参阅事件中心概述事件中心功能

本教程显示如何编写 .NET Core 控制台应用程序,以使用事件处理程序主机从事件中心接收消息。 事件处理程序主机是一个 .NET 类,它通过从事件中心管理持久检查点和并行接收来简化从那些事件中心接收事件的过程。 使用事件处理程序主机,可跨多个接收方拆分事件,即使在不同节点中托管时也是如此。 此示例演示如何为单一接收方使用事件处理程序主机。 [扩大事件处理][使用事件中心扩大事件处理]示例显示如何将事件处理程序主机用于多个接收方。

Note

可以从 GitHub 下载此用作示例的快速入门,将 EventHubConnectionStringEventHubNameStorageAccountNameStorageAccountKeyStorageContainerName 字符串替换为事件中心值,并运行它。 或者,可以按照本教程中的步骤创建自己的解决方案。

先决条件

创建事件中心命名空间和事件中心

第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。 若要创建命名空间和事件中心,请按照本文中的步骤进行操作,然后继续执行本教程的以下步骤。

为事件处理程序主机创建存储帐户

事件处理程序主机是一个智能代理,它通过管理持久性检查点和并行接收操作,来简化从事件中心接收事件的过程。 对于检查点,事件处理程序主机需要一个存储帐户。 以下示例演示如何创建存储帐户,以及如何获取其密钥以进行访问:

  1. 在 Azure 门户中,选择屏幕左上角的“创建资源”。

  2. 选择“存储”,然后选择“存储帐户 - Blob、文件、表、队列”。

    选择存储帐户

  3. 在“创建存储帐户”页中执行以下步骤:

    1. 输入存储帐户的名称。
    2. 选择包含事件中心的 Azure 订阅。
    3. 选择包含事件中心的资源组。
    4. 选择可在其中创建资源的位置。
    5. 然后单击“查看 + 创建”。

      创建存储帐户 - 页面

  4. 在“查看 + 创建”页上查看值,然后选择“创建”。

    查看存储帐户设置,然后执行创建操作

  5. 看到“部署成功”消息后,选择页面顶部的“访问资源”。 还可以通过从资源列表中选择存储帐户来启动“存储帐户”页。

    从部署中选择存储帐户

  6. 在“概要”窗口中选择“Blob”。

    选择 Blob 服务

  7. 选择顶部的“+ 容器”,为容器输入名称,然后选择“确定”。

    创建 Blob 容器

  8. 在左侧菜单中选择”访问密钥”,然后复制 key1 的值。

    将以下值保存到记事本或其他某个临时位置。

    • 存储帐户的名称
    • 存储帐户的访问密钥
    • 容器的名称

创建控制台应用程序

启动 Visual Studio。 在“文件”菜单中,单击“新建”,并单击“项目”。 创建 .NET Core 控制台应用程序。

新建项目

添加事件中心 NuGet 包

遵循以下步骤,将 Microsoft.Azure.EventHubsMicrosoft.Azure.EventHubs.Processor .NET Standard 库 NuGet 包添加项目:

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。
  2. 单击“浏览”选项卡,然后搜索“Microsoft.Azure.EventHubs”,并选择“Microsoft.Azure.EventHubs”包。 单击“安装”以完成安装,并关闭此对话框。
  3. 重复步骤 1 和步骤 2,安装“Microsoft.Azure.EventHubs.Processor”包。

实现 IEventProcessor 接口

  1. 在“解决方案资源管理器”中,右键单击该项目,单击“添加”,并单击“类”。 将新类命名为 SimpleEventProcessor

  2. 打开 SimpleEventProcessor.cs 文件,并将以下 using 语句添加到文件顶部。

    using Microsoft.Azure.EventHubs;
    using Microsoft.Azure.EventHubs.Processor;
    using System.Threading.Tasks;
    
  3. 实现 IEventProcessor 接口。 将 SimpleEventProcessor 类的全部内容替换为以下代码:

    public class SimpleEventProcessor : IEventProcessor
    {
        public Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
            return Task.CompletedTask;
        }
    
        public Task OpenAsync(PartitionContext context)
        {
            Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
            return Task.CompletedTask;
        }
    
        public Task ProcessErrorAsync(PartitionContext context, Exception error)
        {
            Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
            return Task.CompletedTask;
        }
    
        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
                Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
            }
    
            return context.CheckpointAsync();
        }
    }
    

更新 Main 方法以使用 SimpleEventProcessor

  1. 在 Program.cs 文件顶部添加以下 using 语句。

    using Microsoft.Azure.EventHubs;
    using Microsoft.Azure.EventHubs.Processor;
    using System.Threading.Tasks;
    
  2. Program 类添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。 添加以下代码,并将占位符替换为其对应的值:

    private const string EventHubConnectionString = "{Event Hubs connection string}";
    private const string EventHubName = "{Event Hub path/name}";
    private const string StorageContainerName = "{Storage account container name}";
    private const string StorageAccountName = "{Storage account name}";
    private const string StorageAccountKey = "{Storage account key}";
    
    private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
    
  3. 将名为 MainAsync 的新方法添加到 Program 类,如下所示:

    private static async Task MainAsync(string[] args)
    {
        Console.WriteLine("Registering EventProcessor...");
    
        var eventProcessorHost = new EventProcessorHost(
            EventHubName,
            PartitionReceiver.DefaultConsumerGroupName,
            EventHubConnectionString,
            StorageConnectionString,
            StorageContainerName);
    
        // Registers the Event Processor Host and starts receiving messages
        await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
    
        Console.WriteLine("Receiving. Press ENTER to stop worker.");
        Console.ReadLine();
    
        // Disposes of the Event Processor Host
        await eventProcessorHost.UnregisterEventProcessorAsync();
    }
    
  4. Main 方法中添加以下代码行:

    MainAsync(args).GetAwaiter().GetResult();
    

    Program.cs 文件的内容如下所示:

    namespace SampleEphReceiver
    {
    
        public class Program
        {
            private const string EventHubConnectionString = "{Event Hubs connection string}";
            private const string EventHubName = "{Event Hub path/name}";
            private const string StorageContainerName = "{Storage account container name}";
            private const string StorageAccountName = "{Storage account name}";
            private const string StorageAccountKey = "{Storage account key}";
    
            private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1};EndpointSuffix=core.chinacloudapi.cn", StorageAccountName, StorageAccountKey);
    
            public static void Main(string[] args)
            {
                MainAsync(args).GetAwaiter().GetResult();
            }
    
            private static async Task MainAsync(string[] args)
            {
                Console.WriteLine("Registering EventProcessor...");
    
                var eventProcessorHost = new EventProcessorHost(
                    EventHubName,
                    PartitionReceiver.DefaultConsumerGroupName,
                    EventHubConnectionString,
                    StorageConnectionString,
                    StorageContainerName);
    
                // Registers the Event Processor Host and starts receiving messages
                await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
    
                Console.WriteLine("Receiving. Press ENTER to stop worker.");
                Console.ReadLine();
    
                // Disposes of the Event Processor Host
                await eventProcessorHost.UnregisterEventProcessorAsync();
            }
        }
    }
    
  5. 运行程序,并确保没有任何错误。

祝贺你! 现在已使用事件处理器主机从事件中心接收消息。

Note

本教程使用单个 EventProcessorHost 实例。 若要增加吞吐量,建议运行多个 EventProcessorHost 实例,如扩展的事件处理示例中所示。 在这些情况下,为了对接收的事件进行负载均衡,多个实例会自动相互协调。

后续步骤

在本快速入门,你已创建从事件中心接收消息的 .NET Standard 应用程序。 若要了解如何使用 .NET Standard 将事件发送到事件中心,请参阅从事件中心发送事件 - .NET Standard