使用 .NET Core 将事件发送到 Azure 事件中心或从其接收事件Send events to or receive events from Azure Event Hubs using .NET Core

事件中心是一个服务,可用于处理来自连接设备和应用程序的大量事件数据(遥测)。Event Hubs is a service that processes large amounts of event data (telemetry) from connected devices and applications. 将数据采集到事件中心后,可以使用任何实时分析提供程序或存储群集来转换和存储数据。After you collect data into Event Hubs, you can store the data using a storage cluster or transform it using a real-time analytics provider. 这种大规模事件收集和处理功能是现代应用程序体系结构(包括物联网 (IoT))的重要组件。This large-scale event collection and processing capability is a key component of modern application architectures including the Internet of Things (IoT). 有关事件中心的详细概述,请参阅事件中心概述事件中心功能For detailed overview of Event Hubs, see Event Hubs overview and Event Hubs features.

本教程介绍如何在 C# 中创建 .NET Core 应用程序,以便将事件发送到事件中心或从其接收事件。This tutorial shows how to create .NET Core applications in C# to send events to or receive events from an event hub.

Note

可以从 GitHub 下载此用作示例的快速入门,将 EventHubConnectionStringEventHubName 字符串替换为事件中心值,并运行它。You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName strings with your event hub values, and run it. 或者,可以按照本教程中的步骤创建自己的解决方案。Alternatively, you can follow the steps in this tutorial to create your own.

先决条件Prerequisites

  • Microsoft Visual Studio 2015 或 2017Microsoft Visual Studio 2015 or 2017. 本教程中的示例使用 Visual Studio 2017,但也支持 Visual Studio 2015。The examples in this tutorial use Visual Studio 2017, but Visual Studio 2015 is also supported.
  • .NET Core Visual Studio 2015 或 2017 工具.NET Core Visual Studio 2015 or 2017 tools.
  • 创建事件中心命名空间和事件中心Create an Event Hubs namespace and an event hub. 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 要创建命名空间和事件中心,请按照此文中的步骤操作。To create a namespace and an event hub, follow the procedure in this article. 然后,按照以下文章中的说明获取事件中心命名空间的连接字符串获取连接字符串Then, get the connection string for the event hub namespace by following instructions from the article: Get connection string. 本教程后面的步骤将使用此连接字符串。You use the connection string later in this tutorial.

发送事件Send events

此部分介绍如何创建可将事件发送到事件中心的 .NET Core 控制台应用程序。This section shows you how to create a .NET Core console application to send events to an event hub.

创建控制台应用程序Create a console application

启动 Visual Studio。Start Visual Studio. 在“文件”菜单中,单击“新建”,并单击“项目”。From the File menu, click New, and then click Project. 创建 .NET Core 控制台应用程序。Create a .NET Core console application.

新建项目

添加事件中心 NuGet 包Add the Event Hubs NuGet package

通过执行以下步骤,将 Microsoft.Azure.EventHubs .NET Core 库 NuGet 包添加到项目中:Add the Microsoft.Azure.EventHubs .NET Core library NuGet package to your project by following these steps:

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。Right-click the newly created project and select Manage NuGet Packages.
  2. 单击“浏览”选项卡,然后搜索“Microsoft.Azure.EventHubs”,并选择“Microsoft.Azure.EventHubs”包。Click the Browse tab, then search for "Microsoft.Azure.EventHubs" and select the Microsoft.Azure.EventHubs package. 单击“安装”以完成安装,并关闭此对话框。Click Install to complete the installation, then close this dialog box.

编写代码以将消息发送到事件中心Write code to send messages to the event hub

  1. 在 Program.cs 文件顶部添加以下 using 语句:Add the following using statements to the top of the Program.cs file:

    using Microsoft.Azure.EventHubs;
    using System.Text;
    using System.Threading.Tasks;
    
  2. Program 类添加常量作为事件中心连接字符串和实体路径(单个事件中心名称)。Add constants to the Program class for the Event Hubs connection string and entity path (individual event hub name). 将括号中的占位符替换为在创建事件中心时获得的相应值。Replace the placeholders in brackets with the proper values that were obtained when creating the event hub. 请确保 {Event Hubs connection string} 是命名空间级别的连接字符串,而不是事件中心字符串。Make sure that the {Event Hubs connection string} is the namespace-level connection string, and not the event hub string.

    private static EventHubClient eventHubClient;
    private const string EventHubConnectionString = "{Event Hubs connection string}";
    private const string EventHubName = "{Event Hub path/name}";
    
  3. 将名为 MainAsync 的新方法添加到 Program 类,如下所示:Add a new method named MainAsync to the Program class, as follows:

    private static async Task MainAsync(string[] args)
    {
        // Creates an EventHubsConnectionStringBuilder object from the connection string, and sets the EntityPath.
        // Typically, the connection string should have the entity path in it, but this simple scenario
        // uses the connection string from the namespace.
        var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
        {
            EntityPath = EventHubName
        };
    
        eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    
        await SendMessagesToEventHub(100);
    
        await eventHubClient.CloseAsync();
    
        Console.WriteLine("Press ENTER to exit.");
        Console.ReadLine();
    }
    
  4. 将名为 SendMessagesToEventHub 的新方法添加到 Program 类,如下所示:Add a new method named SendMessagesToEventHub to the Program class, as follows:

    // Creates an event hub client and sends 100 messages to the event hub.
    private static async Task SendMessagesToEventHub(int numMessagesToSend)
    {
        for (var i = 0; i < numMessagesToSend; i++)
        {
            try
            {
                var message = $"Message {i}";
                Console.WriteLine($"Sending message: {message}");
                await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }
    
            await Task.Delay(10);
        }
    
        Console.WriteLine($"{numMessagesToSend} messages sent.");
    }
    
  5. Program 类的 Main 方法中添加以下代码:Add the following code to the Main method in the Program class:

    MainAsync(args).GetAwaiter().GetResult();
    

    Program.cs 文件的内容如下所示。Here is what your Program.cs should look like.

    namespace SampleSender
    {
        using System;
        using System.Text;
        using System.Threading.Tasks;
        using Microsoft.Azure.EventHubs;
    
        public class Program
        {
            private static EventHubClient eventHubClient;
            private const string EventHubConnectionString = "{Event Hubs connection string}";
            private const string EventHubName = "{Event Hub path/name}";
    
            public static void Main(string[] args)
            {
                MainAsync(args).GetAwaiter().GetResult();
            }
    
            private static async Task MainAsync(string[] args)
            {
                // Creates an EventHubsConnectionStringBuilder object from the connection string, and sets the EntityPath.
                // Typically, the connection string should have the entity path in it, but for the sake of this simple scenario
                // we are using the connection string from the namespace.
                var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
                {
                    EntityPath = EventHubName
                };
    
                eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    
                await SendMessagesToEventHub(100);
    
                await eventHubClient.CloseAsync();
    
                Console.WriteLine("Press ENTER to exit.");
                Console.ReadLine();
            }
    
            // Creates an event hub client and sends 100 messages to the event hub.
            private static async Task SendMessagesToEventHub(int numMessagesToSend)
            {
                for (var i = 0; i < numMessagesToSend; i++)
                {
                    try
                    {
                        var message = $"Message {i}";
                        Console.WriteLine($"Sending message: {message}");
                        await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
                    }
                    catch (Exception exception)
                    {
                        Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
                    }
    
                    await Task.Delay(10);
                }
    
                Console.WriteLine($"{numMessagesToSend} messages sent.");
            }
        }
    }
    
  6. 运行程序,并确保没有任何错误。Run the program, and ensure that there are no errors.

接收事件Receive events

此部分介绍如何编写 .NET Core 控制台应用程序,以使用事件处理程序主机从事件中心接收消息。This section shows how to write a .NET Core console application that receives messages from an event hub using the Event Processor Host. 事件处理程序主机是一个 .NET 类,它通过从事件中心管理持久检查点和并行接收来简化从那些事件中心接收事件的过程。The Event Processor Host is a .NET class that simplifies receiving events from event hubs by managing persistent checkpoints and parallel receives from those event hubs. 使用事件处理程序主机,可跨多个接收方拆分事件,即使在不同节点中托管时也是如此。Using the Event Processor Host, you can split events across multiple receivers, even when hosted in different nodes. 此示例演示如何为单一接收方使用事件处理程序主机。This example shows how to use the Event Processor Host for a single receiver.

Note

可以从 GitHub 下载此用作示例的快速入门,将 EventHubConnectionStringEventHubNameStorageAccountNameStorageAccountKeyStorageContainerName 字符串替换为事件中心值,并运行它。You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName, StorageAccountName, StorageAccountKey, and StorageContainerName strings with your event hub values, and run it. 或者,可以按照本教程中的步骤创建自己的解决方案。Alternatively, you can follow the steps in this tutorial to create your own.

为事件处理程序主机创建存储帐户Create a storage account for Event Processor Host

事件处理程序主机是一个智能代理,它通过管理持久性检查点和并行接收操作,来简化从事件中心接收事件的过程。The Event Processor Host is an intelligent agent that simplifies receiving events from Event Hubs by managing persistent checkpoints and parallel receives. 对于检查点,事件处理程序主机需要一个存储帐户。For checkpointing, the Event Processor Host requires a storage account. 以下示例演示如何创建存储帐户,以及如何获取其密钥以进行访问:The following example shows how to create a storage account and how to get its keys for access:

  1. 在 Azure 门户中,选择屏幕左上角的“创建资源”。In the Azure portal, and select Create a resource at the top left of the screen.

  2. 选择“存储”,然后选择“存储帐户 - Blob、文件、表、队列”。Select Storage, then select Storage account - blob, file, table, queue.

    选择存储帐户

  3. 在“创建存储帐户”页中执行以下步骤:On the Create storage account page, take the following steps:

    1. 输入存储帐户的名称。Enter a name for the storage account.

    2. 选择包含事件中心的 Azure 订阅。Choose an Azure subscription that contains the event hub.

    3. 选择包含事件中心的资源组。Select the resource group that has the event hub.

    4. 选择可在其中创建资源的位置。Select a location in which to create the resource.

    5. 然后单击“查看 + 创建”。Then click Review + create.

      创建存储帐户 - 页面

  4. 在“查看 + 创建”页上查看值,然后选择“创建”。On the Review + create page, review the values, and select Create.

    查看存储帐户设置,然后执行创建操作

  5. 看到“部署成功”消息后,选择页面顶部的“访问资源”。After you see the Deployments Succeeded message, select Got to resource at the top of the page. 还可以通过从资源列表中选择存储帐户来启动“存储帐户”页。You can also launch the Storage Account page by selecting your storage account from the resource list.

    从部署中选择存储帐户

  6. 在“概要”窗口中选择“Blob”。In the Essentials window, select Blobs.

    选择 Blob 服务

  7. 选择顶部的“+ 容器”,为容器输入名称,然后选择“确定”。Select + Container at the top, enter a name for the container, and select OK.

    创建 Blob 容器

  8. 在左侧菜单中选择”访问密钥”,然后复制 key1 的值。Select Access keys in the left-side menu, and copy the value of key1.

    将以下值保存到记事本或其他某个临时位置。Save the following values to Notepad or some other temporary location.

    • 存储帐户的名称Name of the storage account
    • 存储帐户的访问密钥Access key for the storage account
    • 容器的名称Name of the container

创建控制台应用程序Create a console application

启动 Visual Studio。Start Visual Studio. 在“文件”菜单中,单击“新建”,并单击“项目”。From the File menu, click New, and then click Project. 创建 .NET Core 控制台应用程序。Create a .NET Core console application.

新建项目

添加事件中心 NuGet 包Add the Event Hubs NuGet package

遵循以下步骤,将 Microsoft.Azure.EventHubsMicrosoft.Azure.EventHubs.Processor .NET Standard 库 NuGet 包添加项目:Add the Microsoft.Azure.EventHubs and Microsoft.Azure.EventHubs.Processor .NET Standard library NuGet packages to your project by following these steps:

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。Right-click the newly created project and select Manage NuGet Packages.
  2. 单击“浏览”选项卡,然后搜索“Microsoft.Azure.EventHubs”,并选择“Microsoft.Azure.EventHubs”包。Click the Browse tab, search for Microsoft.Azure.EventHubs, and then select the Microsoft.Azure.EventHubs package. 单击“安装”以完成安装,并关闭此对话框。Click Install to complete the installation, then close this dialog box.
  3. 重复步骤 1 和步骤 2,安装“Microsoft.Azure.EventHubs.Processor”包。Repeat steps 1 and 2, and install the Microsoft.Azure.EventHubs.Processor package.

实现 IEventProcessor 接口Implement the IEventProcessor interface

  1. 在“解决方案资源管理器”中,右键单击该项目,单击“添加”,并单击“类”。In Solution Explorer, right-click the project, click Add, and then click Class. 将新类命名为 SimpleEventProcessorName the new class SimpleEventProcessor.

  2. 打开 SimpleEventProcessor.cs 文件,并将以下 using 语句添加到文件顶部。Open the SimpleEventProcessor.cs file and add the following using statements to the top of the file.

    using Microsoft.Azure.EventHubs;
    using Microsoft.Azure.EventHubs.Processor;
    using System.Threading.Tasks;
    
  3. 实现 IEventProcessor 接口。Implement the IEventProcessor interface. SimpleEventProcessor 类的全部内容替换为以下代码:Replace the entire contents of the SimpleEventProcessor class with the following code:

    public class SimpleEventProcessor : IEventProcessor
    {
        public Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
            return Task.CompletedTask;
        }
    
        public Task OpenAsync(PartitionContext context)
        {
            Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
            return Task.CompletedTask;
        }
    
        public Task ProcessErrorAsync(PartitionContext context, Exception error)
        {
            Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
            return Task.CompletedTask;
        }
    
        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
                Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
            }
    
            return context.CheckpointAsync();
        }
    }
    

更新 Main 方法以使用 SimpleEventProcessorUpdate the Main method to use SimpleEventProcessor

  1. 在 Program.cs 文件顶部添加以下 using 语句。Add the following using statements to the top of the Program.cs file.

    using Microsoft.Azure.EventHubs;
    using Microsoft.Azure.EventHubs.Processor;
    using System.Threading.Tasks;
    
  2. Program 类添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。Add constants to the Program class for the event hub connection string, event hub name, storage account container name, storage account name, and storage account key. 添加以下代码,并将占位符替换为其对应的值:Add the following code, replacing the placeholders with their corresponding values:

    private const string EventHubConnectionString = "{Event Hubs connection string}";
    private const string EventHubName = "{Event Hub path/name}";
    private const string StorageContainerName = "{Storage account container name}";
    private const string StorageAccountName = "{Storage account name}";
    private const string StorageAccountKey = "{Storage account key}";
    
    private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
    
  3. 将名为 MainAsync 的新方法添加到 Program 类,如下所示:Add a new method named MainAsync to the Program class, as follows:

    private static async Task MainAsync(string[] args)
    {
        Console.WriteLine("Registering EventProcessor...");
    
        var eventProcessorHost = new EventProcessorHost(
            EventHubName,
            PartitionReceiver.DefaultConsumerGroupName,
            EventHubConnectionString,
            StorageConnectionString,
            StorageContainerName);
    
        // Registers the Event Processor Host and starts receiving messages
        await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
    
        Console.WriteLine("Receiving. Press ENTER to stop worker.");
        Console.ReadLine();
    
        // Disposes of the Event Processor Host
        await eventProcessorHost.UnregisterEventProcessorAsync();
    }
    
  4. Main 方法中添加以下代码行:Add the following line of code to the Main method:

    MainAsync(args).GetAwaiter().GetResult();
    

    Program.cs 文件的内容如下所示:Here is what your Program.cs file should look like:

    namespace SampleEphReceiver
    {
    
        public class Program
        {
            private const string EventHubConnectionString = "{Event Hubs connection string}";
            private const string EventHubName = "{Event Hub path/name}";
            private const string StorageContainerName = "{Storage account container name}";
            private const string StorageAccountName = "{Storage account name}";
            private const string StorageAccountKey = "{Storage account key}";
    
            private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
    
            public static void Main(string[] args)
            {
                MainAsync(args).GetAwaiter().GetResult();
            }
    
            private static async Task MainAsync(string[] args)
            {
                Console.WriteLine("Registering EventProcessor...");
    
                var eventProcessorHost = new EventProcessorHost(
                    EventHubName,
                    PartitionReceiver.DefaultConsumerGroupName,
                    EventHubConnectionString,
                    StorageConnectionString,
                    StorageContainerName);
    
                // Registers the Event Processor Host and starts receiving messages
                await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
    
                Console.WriteLine("Receiving. Press ENTER to stop worker.");
                Console.ReadLine();
    
                // Disposes of the Event Processor Host
                await eventProcessorHost.UnregisterEventProcessorAsync();
            }
        }
    }
    
  5. 运行程序,并确保没有任何错误。Run the program, and ensure that there are no errors.

后续步骤Next steps

请阅读以下文章:Read the following articles: