使用 .NET Core (Microsoft.Azure.EventHubs) 向/从 Azure 事件中心发送/接收事件Send events to or receive events from Azure Event Hubs using .NET Core (Microsoft.Azure.EventHubs)

本快速入门介绍如何使用 Microsoft.Azure.EventHubs .NET Core 库向/从事件中心发送/接收事件。This quickstart shows how to send events to and receive events from an event hub using the Microsoft.Azure.EventHubs .NET Core library.

警告

本快速入门使用旧 Microsoft.Azure.EventHubs 包。This quickstart uses the old Microsoft.Azure.EventHubs package. 有关使用最新 Azure.Messaging.EventHubs 库的快速入门,请参阅使用 Azure.Messaging.EventHubs 库发送和接收事件For a quickstart that uses the latest Azure.Messaging.EventHubs library, see Send and receive events using Azure.Messaging.EventHubs library. 若要将应用程序从使用旧库迁移到使用新库,请参阅从 Microsoft.Azure.EventHubs 迁移到 Azure.Messaging.EventHubs 的指南To move your application from using the old library to new one, see the Guide to migrate from Microsoft.Azure.EventHubs to Azure.Messaging.EventHubs.

先决条件Prerequisites

如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述If you are new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

若要完成本快速入门,需要具备以下先决条件:To complete this quickstart, you need the following prerequisites:

  • Microsoft Azure 订阅。Microsoft Azure subscription. 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。To use Azure services, including Azure Event Hubs, you need a subscription. 如果没有现有的 Azure 帐户,可以注册试用版创建购买帐户If you don't have an existing Azure account, you can sign up for a trial or create a purchase account.
  • Microsoft Visual Studio 2019。Microsoft Visual Studio 2019.
  • .NET Core Visual Studio 2015 或 2017 工具.NET Core Visual Studio 2015 or 2017 tools.
  • 创建事件中心命名空间和事件中心。Create an Event Hubs namespace and an event hub. 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 要创建命名空间和事件中心,请按照此文中的步骤操作。To create a namespace and an event hub, follow the procedure in this article. 然后,按照以下文章中的说明获取事件中心命名空间的连接字符串获取连接字符串Then, get the connection string for the event hub namespace by following instructions from the article: Get connection string. 稍后将在本快速入门中使用连接字符串。You use the connection string later in this quickstart.

发送事件Send events

本部分介绍如何创建一个向事件中心发送事件的 .NET Core 控制台应用程序。This section shows you how to create a .NET Core console application to send events to an event hub.

备注

可以从 GitHub 下载此用作示例的快速入门,将 EventHubConnectionStringEventHubName 字符串替换为事件中心值,并运行它。You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName strings with your event hub values, and run it. 或者,也可以按照本快速入门中的步骤创建自己的应用程序。Alternatively, you can follow the steps in this quickstart to create your own.

创建控制台应用程序Create a console application

启动 Visual Studio。Start Visual Studio. 在“文件”菜单上,单击“新建”,然后单击“项目”。From the File menu, click New, and then click Project. 创建 .NET Core 控制台应用程序。Create a .NET Core console application.

新建项目

添加事件中心 NuGet 包Add the Event Hubs NuGet package

通过执行以下步骤,将 Microsoft.Azure.EventHubs .NET Core 库 NuGet 包添加到项目中:Add the Microsoft.Azure.EventHubs .NET Core library NuGet package to your project by following these steps:

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。Right-click the newly created project and select Manage NuGet Packages.
  2. 单击“浏览”选项卡,然后搜索“Microsoft.Azure.EventHubs”,并选择“Microsoft.Azure.EventHubs”包********。Click the Browse tab, then search for "Microsoft.Azure.EventHubs" and select the Microsoft.Azure.EventHubs package. 单击“安装”以完成安装,并关闭此对话框。Click Install to complete the installation, then close this dialog box.

编写代码以将消息发送到事件中心Write code to send messages to the event hub

  1. 在 Program.cs 文件顶部添加以下 using 语句:Add the following using statements to the top of the Program.cs file:

    using Microsoft.Azure.EventHubs;
    using System.Text;
    using System.Threading.Tasks;
    
  2. Program 类添加常量作为事件中心连接字符串和实体路径(单个事件中心名称)。Add constants to the Program class for the Event Hubs connection string and entity path (individual event hub name). 将括号中的占位符替换为在创建事件中心时获得的相应值。Replace the placeholders in brackets with the proper values that were obtained when creating the event hub. 请确保 {Event Hubs connection string} 是命名空间级别的连接字符串,而不是事件中心字符串。Make sure that the {Event Hubs connection string} is the namespace-level connection string, and not the event hub string.

    private static EventHubClient eventHubClient;
    private const string EventHubConnectionString = "{Event Hubs connection string}";
    private const string EventHubName = "{Event Hub path/name}";
    
  3. 将名为 MainAsync 的新方法添加到 Program 类,如下所示:Add a new method named MainAsync to the Program class, as follows:

    private static async Task MainAsync(string[] args)
    {
        // Creates an EventHubsConnectionStringBuilder object from the connection string, and sets the EntityPath.
        // Typically, the connection string should have the entity path in it, but this simple scenario
        // uses the connection string from the namespace.
        var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
        {
            EntityPath = EventHubName
        };
    
        eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    
        await SendMessagesToEventHub(100);
    
        await eventHubClient.CloseAsync();
    
        Console.WriteLine("Press ENTER to exit.");
        Console.ReadLine();
    }
    
  4. 将名为 SendMessagesToEventHub 的新方法添加到 Program 类,如下所示:Add a new method named SendMessagesToEventHub to the Program class, as follows:

    // Uses the event hub client to send 100 messages to the event hub.
    private static async Task SendMessagesToEventHub(int numMessagesToSend)
    {
        for (var i = 0; i < numMessagesToSend; i++)
        {
            try
            {
                var message = $"Message {i}";
                Console.WriteLine($"Sending message: {message}");
                await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }
    
            await Task.Delay(10);
        }
    
        Console.WriteLine($"{numMessagesToSend} messages sent.");
    }
    
  5. Program 类的 Main 方法中添加以下代码:Add the following code to the Main method in the Program class:

    MainAsync(args).GetAwaiter().GetResult();
    

    Program.cs 文件的内容如下所示。Here is what your Program.cs should look like.

    namespace SampleSender
    {
        using System;
        using System.Text;
        using System.Threading.Tasks;
        using Microsoft.Azure.EventHubs;
    
        public class Program
        {
            private static EventHubClient eventHubClient;
            private const string EventHubConnectionString = "{Event Hubs connection string}";
            private const string EventHubName = "{Event Hub path/name}";
    
            public static void Main(string[] args)
            {
                MainAsync(args).GetAwaiter().GetResult();
            }
    
            private static async Task MainAsync(string[] args)
            {
                // Creates an EventHubsConnectionStringBuilder object from the connection string, and sets the EntityPath.
                // Typically, the connection string should have the entity path in it, but for the sake of this simple scenario
                // we are using the connection string from the namespace.
                var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
                {
                    EntityPath = EventHubName
                };
    
                eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    
                await SendMessagesToEventHub(100);
    
                await eventHubClient.CloseAsync();
    
                Console.WriteLine("Press ENTER to exit.");
                Console.ReadLine();
            }
    
            // Uses the event hub client to send 100 messages to the event hub.
            private static async Task SendMessagesToEventHub(int numMessagesToSend)
            {
                for (var i = 0; i < numMessagesToSend; i++)
                {
                    try
                    {
                        var message = $"Message {i}";
                        Console.WriteLine($"Sending message: {message}");
                        await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
                    }
                    catch (Exception exception)
                    {
                        Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
                    }
    
                    await Task.Delay(10);
                }
    
                Console.WriteLine($"{numMessagesToSend} messages sent.");
            }
        }
    }
    
  6. 运行程序,并确保没有任何错误。Run the program, and ensure that there are no errors.

接收事件Receive events

此部分介绍如何编写 .NET Core 控制台应用程序,以使用事件处理程序主机从事件中心接收消息。This section shows how to write a .NET Core console application that receives messages from an event hub using the Event Processor Host. 事件处理程序主机是一个 .NET 类,它通过从事件中心管理持久检查点和并行接收来简化从那些事件中心接收事件的过程。The Event Processor Host is a .NET class that simplifies receiving events from event hubs by managing persistent checkpoints and parallel receives from those event hubs. 使用事件处理程序主机,可跨多个接收方拆分事件,即使在不同节点中托管时也是如此。Using the Event Processor Host, you can split events across multiple receivers, even when hosted in different nodes. 此示例演示如何为单一接收方使用事件处理程序主机。This example shows how to use the Event Processor Host for a single receiver.

备注

可以从 GitHub 下载此用作示例的快速入门,将 EventHubConnectionStringEventHubNameStorageAccountNameStorageAccountKeyStorageContainerName 字符串替换为事件中心值,并运行它。You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName, StorageAccountName, StorageAccountKey, and StorageContainerName strings with your event hub values, and run it. 或者,可以按照本教程中的步骤创建自己的解决方案。Alternatively, you can follow the steps in this tutorial to create your own.

为事件处理程序主机创建存储帐户Create a storage account for Event Processor Host

事件处理程序主机是一个智能代理,它通过管理持久性检查点和并行接收操作,来简化从事件中心接收事件的过程。The Event Processor Host is an intelligent agent that simplifies receiving events from Event Hubs by managing persistent checkpoints and parallel receives. 对于检查点,事件处理程序主机需要一个存储帐户。For checkpointing, the Event Processor Host requires a storage account. 以下示例演示如何创建存储帐户,以及如何获取其密钥以进行访问:The following example shows how to create a storage account and how to get its keys for access:

  1. 在 Azure 门户菜单中,选择“创建资源” 。From the Azure portal menu, select Create a resource.

    创建资源菜单项,Microsoft Azure 门户

  2. 选择“存储” “存储帐户” > 。Select Storage > Storage account.

    选择存储帐户,Microsoft Azure 门户

  3. 在“创建存储帐户”页中执行以下步骤: On the Create storage account page, take the following steps:

    1. 输入“存储帐户名称” 。Enter the Storage account name.

    2. 选择包含事件中心的 Azure 订阅。 Choose an Azure Subscription that contains the event hub.

    3. 选择或创建包含事件中心的“资源组”。 Choose or create the Resource group that has the event hub.

    4. 选择要在其中创建资源的“位置”。 Pick a Location in which to create the resource.

    5. 选择“查看 + 创建” 。Select Review + create.

      查看 + 创建、创建存储帐户,Microsoft Azure 门户

  4. 在“查看 + 创建” 页上查看值,然后选择“创建”。 On the Review + create page, review the values, and select Create.

    查看存储帐户设置并创建,Microsoft Azure 门户

  5. 在通知中看到“部署已成功”消息后,选择“前往资源”以打开“存储帐户”页 。After you see the Deployments Succeeded message in your notifications, select Go to resource to open the Storage Account page. 或者,可以展开“部署详细信息”,然后从资源列表中选择新资源 。Alternatively, you can expand Deployment details and then select your new resource from the resource list.

    前往资源、存储帐户部署,Microsoft Azure 门户

  6. 选择“容器” 。Select Containers.

    选择“Blob 容器服务”、“存储帐户”,Microsoft Azure 门户

  7. 选择顶部的“+ 容器” ,为容器输入“名称” ,然后选择“确定” 。Select + Container at the top, enter a Name for the container, and select OK.

    创建新的 Blob 容器、存储帐户,Microsoft Azure 门户

  8. 从“存储帐户”页菜单中选择“访问密钥”,并复制“key1”的值 。Choose Access keys from the Storage account page menu, and copy the value of key1.

    将以下值保存到记事本或其他某个临时位置。Save the following values to Notepad or some other temporary location.

    • 存储帐户的名称Name of the storage account
    • 存储帐户的访问密钥Access key for the storage account
    • 容器的名称Name of the container

创建控制台应用程序Create a console application

启动 Visual Studio。Start Visual Studio. 在“文件”菜单上,单击“新建”,然后单击“项目”。From the File menu, click New, and then click Project. 创建 .NET Core 控制台应用程序。Create a .NET Core console application.

用于接收的新项目

添加事件中心 NuGet 包Add the Event Hubs NuGet package

遵循以下步骤,将 Microsoft.Azure.EventHubsMicrosoft.Azure.EventHubs.Processor .NET Standard 库 NuGet 包添加项目:Add the Microsoft.Azure.EventHubs and Microsoft.Azure.EventHubs.Processor .NET Standard library NuGet packages to your project by following these steps:

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。Right-click the newly created project and select Manage NuGet Packages.
  2. 单击“浏览”选项卡,然后搜索“Microsoft.Azure.EventHubs”,并选择“Microsoft.Azure.EventHubs”包************。Click the Browse tab, search for Microsoft.Azure.EventHubs, and then select the Microsoft.Azure.EventHubs package. 单击“安装”以完成安装,并关闭此对话框。Click Install to complete the installation, then close this dialog box.
  3. 重复步骤 1 和步骤 2,安装“Microsoft.Azure.EventHubs.Processor”包****。Repeat steps 1 and 2, and install the Microsoft.Azure.EventHubs.Processor package.

实现 IEventProcessor 接口Implement the IEventProcessor interface

  1. 在“解决方案资源管理器”中,右键单击该项目,单击“添加”,并单击“类”。********In Solution Explorer, right-click the project, click Add, and then click Class. 将新类命名为 SimpleEventProcessorName the new class SimpleEventProcessor.

  2. 打开 SimpleEventProcessor.cs 文件,并将以下 using 语句添加到文件顶部。Open the SimpleEventProcessor.cs file and add the following using statements to the top of the file.

    using Microsoft.Azure.EventHubs;
    using Microsoft.Azure.EventHubs.Processor;
    using System.Threading.Tasks;
    
  3. 实现 IEventProcessor 接口。Implement the IEventProcessor interface. SimpleEventProcessor 类的全部内容替换为以下代码:Replace the entire contents of the SimpleEventProcessor class with the following code:

    public class SimpleEventProcessor : IEventProcessor
    {
        public Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
            return Task.CompletedTask;
        }
    
        public Task OpenAsync(PartitionContext context)
        {
            Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
            return Task.CompletedTask;
        }
    
        public Task ProcessErrorAsync(PartitionContext context, Exception error)
        {
            Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
            return Task.CompletedTask;
        }
    
        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
                Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
            }
    
            return context.CheckpointAsync();
        }
    }
    

更新 Main 方法以使用 SimpleEventProcessorUpdate the Main method to use SimpleEventProcessor

  1. 在 Program.cs 文件顶部添加以下 using 语句。Add the following using statements to the top of the Program.cs file.

    using Microsoft.Azure.EventHubs;
    using Microsoft.Azure.EventHubs.Processor;
    using System.Threading.Tasks;
    
  2. Program 类添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。Add constants to the Program class for the event hub connection string, event hub name, storage account container name, storage account name, and storage account key. 添加以下代码,并将占位符替换为其对应的值:Add the following code, replacing the placeholders with their corresponding values:

    private const string EventHubConnectionString = "{Event Hubs connection string}";
    private const string EventHubName = "{Event Hub path/name}";
    private const string StorageContainerName = "{Storage account container name}";
    private const string StorageAccountName = "{Storage account name}";
    private const string StorageAccountKey = "{Storage account key}";
    
    private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
    
  3. 将名为 MainAsync 的新方法添加到 Program 类,如下所示:Add a new method named MainAsync to the Program class, as follows:

    private static async Task MainAsync(string[] args)
    {
        Console.WriteLine("Registering EventProcessor...");
    
        var eventProcessorHost = new EventProcessorHost(
            EventHubName,
            PartitionReceiver.DefaultConsumerGroupName,
            EventHubConnectionString,
            StorageConnectionString,
            StorageContainerName);
    
        // Registers the Event Processor Host and starts receiving messages
        await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
    
        Console.WriteLine("Receiving. Press ENTER to stop worker.");
        Console.ReadLine();
    
        // Disposes of the Event Processor Host
        await eventProcessorHost.UnregisterEventProcessorAsync();
    }
    
  4. Main 方法中添加以下代码行:Add the following line of code to the Main method:

    MainAsync(args).GetAwaiter().GetResult();
    

    Program.cs 文件的内容如下所示:Here is what your Program.cs file should look like:

    namespace SampleEphReceiver
    {
    
        public class Program
        {
            private const string EventHubConnectionString = "{Event Hubs connection string}";
            private const string EventHubName = "{Event Hub path/name}";
            private const string StorageContainerName = "{Storage account container name}";
            private const string StorageAccountName = "{Storage account name}";
            private const string StorageAccountKey = "{Storage account key}";
    
            private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
    
            public static void Main(string[] args)
            {
                MainAsync(args).GetAwaiter().GetResult();
            }
    
            private static async Task MainAsync(string[] args)
            {
                Console.WriteLine("Registering EventProcessor...");
    
                var eventProcessorHost = new EventProcessorHost(
                    EventHubName,
                    PartitionReceiver.DefaultConsumerGroupName,
                    EventHubConnectionString,
                    StorageConnectionString,
                    StorageContainerName);
    
                // Registers the Event Processor Host and starts receiving messages
                await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
    
                Console.WriteLine("Receiving. Press ENTER to stop worker.");
                Console.ReadLine();
    
                // Disposes of the Event Processor Host
                await eventProcessorHost.UnregisterEventProcessorAsync();
            }
        }
    }
    
  5. 运行程序,并确保没有任何错误。Run the program, and ensure that there are no errors.

后续步骤Next steps

请阅读以下文章:Read the following articles: