快速入门:使用 .NET 向/从 Azure 事件中心发送/接收事件
本快速入门介绍了如何使用 Azure.Messaging.EventHubs .NET 库向事件中心发送事件,然后从事件中心接收事件。
注意
快速入门可供你快速了解该服务。 如果已经熟悉该服务,可能需要在 GitHub 上的 .NET SDK 存储库中查看事件中心的 .NET 示例:GitHub 上的事件中心示例、GitHub 上的事件处理器示例。
先决条件
如果不熟悉 Azure 事件中心,请在阅读本快速入门之前先参阅 事件中心概述。
若要完成本快速入门,需要具备以下先决条件:
- Azure 订阅。 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。 如果没有现有的 Azure 帐户,可以注册试用订阅帐户,或者在创建帐户时使用 MSDN 订阅者权益。
- Microsoft Visual Studio 2022。 Azure 事件中心客户端库利用 C# 8.0 中引入的新功能。 仍然可以使用之前的 C# 语言版本的库,但新语法不可用。 若要使用完整语法,建议使用 .NET Core SDK 3.0 或更高版本进行编译,并将语言版本设置为
latest
。 如果使用 Visual Studio,Visual Studio 2022 以前的版本与生成 C# 8.0 项目时所需的工具将不兼容。 可在此处下载 Visual Studio 2022(包括免费的 Community Edition)。 - 创建事件中心命名空间和事件中心。 第一步是使用 Azure 门户在命名空间中创建事件中心命名空间和事件中心。 然后,获取应用程序与事件中心通信所需的管理凭据。 要创建命名空间和事件中心,请参阅 快速入门:使用 Azure 门户创建事件中心。
向 Azure 验证应用
本快速入门介绍了连接到 Azure 事件中心的两种方法:
- 无密码(Microsoft Entra 身份验证)
- 连接字符串
第一个选项展示了如何使用 Azure Active Directory 中的安全主体和基于角色的访问控制(RBAC) 连接到事件中心命名空间。 无需担心代码、配置文件或安全存储(如 Azure 密钥保管库)中存在硬编码的连接字符串。
第二个选项展示了如何使用 连接字符串 连接到事件中心命名空间。 如果不熟悉 Azure,你可能会感觉连接字符串选项更易于使用。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅身份验证和授权。 还可以在概述页上阅读有关无密码身份验证的详细信息。
将角色分配到 Microsoft Entra 用户帐户
在本地开发时,请确保连接到 Azure 事件中心的用户帐户具有正确的权限。 你需要拥有 Azure 事件中心数据所有者角色才能发送和接收消息。 若要为自己分配此角色,需要具有“用户访问管理员”角色,或者具有包含 Microsoft.Authorization/roleAssignments/write
操作的其他角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 可在范围概述页上详细了解角色分配的可用范围。
以下示例将 Azure Event Hubs Data Owner
角色分配给用户帐户,该角色提供对 Azure 事件中心资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。
Azure 事件中心的内置 Azure 角色
对于 Azure 事件中心,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源进行的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下 Azure 内置角色,用于授予对事件中心命名空间的访问权限:
- Azure 事件中心数据所有者:允许对事件中心命名空间及其实体(队列、主题、订阅和筛选器)进行数据访问
- Azure 事件中心数据发送者:使用此角色授予对事件中心命名空间及其实体的发送者访问权限。
- Azure 事件中心数据接收者:使用此角色授予对事件中心命名空间及其实体的接收者访问权限。
如果要创建自定义角色,请参阅执行事件中心操作所需的权限。
重要
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,最多可能需要 8 分钟。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
在 Azure 门户中,使用主搜索栏或左侧导航找到你的事件中心命名空间。
在概述页面上,从左侧菜单中选择“访问控制(IAM)”。
在“访问控制 (IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择“+ 添加”,然后从出现的下拉菜单中选择“添加角色分配”。
使用搜索框将结果筛选为所需角色。 对于此示例,请搜索
Azure Event Hubs Data Owner
并选择匹配的结果。 然后选择“下一步” 。在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。
在对话框中,搜索 Microsoft Entra ID 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。
选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。
启动 Visual Studio 并登录到 Azure
可以使用以下步骤授权对服务总线命名空间的访问:
启动 Visual Studio。 如果看到“入门”窗口,请在右窗格中选择“在不使用代码的情况下继续”链接。
选择 Visual Studio 右上角的“登录”按钮。
使用你之前为其分配角色的 Microsoft Entra 帐户登录。
向事件中心发送事件
本部分介绍了如何创建 .NET Core 控制台应用程序以向你创建的事件中心发送事件。
创建控制台应用程序
如果 Visual Studio 2022 已打开,请依次选择菜单上的“文件”、“新建”、“项目”。 否则,如果看到弹出窗口,请启动 Visual Studio 2022 并选择“新建项目”。
在“创建新项目”对话框中执行以下步骤:如果看不到此对话框,请在菜单中选择“文件”,然后依次选择“新建”、“项目”。
选择“C#”作为编程语言。
选择“控制台”作为应用程序类型。
从结果列表中选择“控制台应用程序”。
然后,选择“下一步” 。
输入 EventHubsSender 作为项目名称、EventHubsQuickStart 作为解决方案名称,然后选择“下一步”。
在“其他信息”页上,选择“创建”。
向项目添加 NuGet 包
在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”。
运行以下命令以安装 Azure.Messaging.EventHubs 和 Azure.Identity NuGet 包。 按 ENTER 运行第二个命令。
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity
编写代码以将事件发送到事件中心
将
Program.cs
文件中的现有代码替换为以下示例代码。 然后,将EventHubProducerClient
参数的<EVENT_HUB_NAMESPACE>
和<HUB_NAME>
占位符值替换为事件中心命名空间和事件中心的名称。 例如"spehubns0309.servicebus.chinacloudapi.cn"
和"spehub"
。下面是代码中的重要步骤:
- 使用命名空间和事件中心名称创建一个 EventHubProducerClient 对象。
- 对 EventHubProducerClient 对象调用 CreateBatchAsync 方法,创建一个 EventDataBatch 对象。
- 使用 EventDataBatch.TryAdd 方法将事件添加到批处理中。
- 使用 EventHubProducerClient.SendAsync 方法将这批消息发送到事件中心。
using Azure.Identity; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using System.Text; // number of events to be sent to the event hub int numOfEvents = 3; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. // TODO: Replace the <EVENT_HUB_NAMESPACE> and <HUB_NAME> placeholder values EventHubProducerClient producerClient = new EventHubProducerClient( "<EVENT_HUB_NAMESPACE>.servicebus.chinacloudapi.cn", "<HUB_NAME>", new DefaultAzureCredential()); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); for (int i = 1; i <= numOfEvents; i++) { if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event {i}")))) { // if it is too large for the batch throw new Exception($"Event {i} is too large for the batch and cannot be sent."); } } try { // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine($"A batch of {numOfEvents} events has been published."); Console.ReadLine(); } finally { await producerClient.DisposeAsync(); }
生成项目并确保没有错误。
运行程序并等待出现确认消息。
A batch of 3 events has been published.
重要
如果使用无密码(Azure Active Directory 的基于角色的访问控制)身份验证,请选择“工具”,然后选择“选项”。 在“选项”窗口中,展开“Azure 服务身份验证”,然后选择“帐户选择”。 确认你使用的是已添加到事件中心命名空间上“Azure 事件中心数据所有者”角色的帐户。
在 Azure 门户的“事件中心命名空间”页面,可以在“邮件”图表中看到三封传入的邮件。 根据需要刷新页面以更新图表。 可能需要在几秒钟后才会显示已收到消息。
注意
有关包含更详细注释的完整源代码,请参阅 GitHub 上的此文件
从事件中心接收事件
本部分介绍如何编写一个使用事件处理程序从事件中心接收事件的 .NET Core 控制台应用程序。 事件处理器简化了从事件中心接收事件的过程。
创建 Azure 存储帐户和 Blob 容器
本快速入门使用 Azure 存储作为检查点存储。 按照以下步骤创建 Azure 存储帐户。
- 创建 Azure 存储帐户
- 创建一个 blob 容器
- 使用 Microsoft Entra ID(无密码)身份验证或命名空间的连接字符串对 blob 容器进行身份验证。
使用 Azure Blob 存储作为检查点存储时,请遵循以下建议:
- 对每个使用者组使用单独的容器。 可以使用同一存储帐户,但每个组使用一个容器。
- 请勿将容器用于任何其他用途,也不要将存储帐户用于任何其他用途。
- 存储帐户应位于部署的应用程序所在的同一区域中。 如果应用程序位于本地,请尝试选择最近的区域。
在 Azure 门户的“存储帐户”页上的“Blob 服务”部分,确保禁用以下设置。
- 分层命名空间
- Blob 软删除
- 版本控制
在本地开发时,请确保访问 Blob 数据的用户帐户具有正确的权限。 需有“存储 Blob 数据参与者”角色才能读取和写入 Blob 数据。 若要为你自己分配此角色,需要具有“用户访问管理员”角色,或者具有包含 Microsoft.Authorization/roleAssignments/write 操作的其他角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 可以在范围概述页上详细了解角色分配的可用范围。
在此方案中,你将为用户帐户分配权限(范围为存储帐户)以遵循最低权限原则。 这种做法仅为用户提供所需的最低权限,并创建更安全的生产环境。
以下示例将“存储 Blob 数据参与者”角色分配给用户帐户,该角色提供对存储帐户中 Blob 数据的读取和写入访问权限。
重要
在大多数情况下,角色分配在 Azure 中传播需要一两分钟的时间,但极少数情况下最多可能需要 8 分钟。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
在 Azure 门户中,使用主搜索栏或左侧导航找到存储帐户。
在存储帐户概述页的左侧菜单中选择“访问控制 (IAM)”。
在“访问控制 (IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择“+ 添加”,然后从出现的下拉菜单中选择“添加角色分配”。
使用搜索框将结果筛选为所需角色。 在此示例中,搜索“存储 Blob 数据参与者”并选择匹配的结果,然后选择“下一步”。
在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。
在对话框中,搜索 Microsoft Entra ID 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。
选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。
为接收器创建项目
- 在“解决方案资源管理器”窗口中,右键单击“EventHubQuickStart”解决方案,指向“添加”,然后选择“新建项目”。
- 依次选择“控制台应用程序”、“下一步” 。
- 输入 EventHubsReceiver 作为“项目名称”,然后选择“创建”。
- 在“解决方案资源管理器”窗口中,右键单击“EventHubsReceiver”并选择“设为启动项目” 。
向项目添加 NuGet 包
在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”。
在“包管理器控制台”窗口中,确认为“默认项目”选择了“EventHubsReceiver” 。 如果不是,请使用下拉列表选择“EventHubsReceiver”。
运行以下命令安装 Azure.Messaging.EventHubs 和 Azure.Identity NuGet 包。 按 ENTER 运行最新命令。
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity
更新代码
将 Program.cs 的内容替换为以下代码:
将
Program.cs
文件中的现有代码替换为以下示例代码。 然后,替换BlobContainerClient
URI 的<STORAGE_ACCOUNT_NAME>
和<BLOB_CONTAINER_NAME>
占位符值。 同时请替换EventProcessorClient
的<EVENT_HUB_NAMESPACE>
和<HUB_NAME>
占位符值。下面是代码中的重要步骤:
- 使用事件中心命名空间和事件中心名称创建 EventProcessorClient 对象。 需要在先前创建的 Azure 存储中为容器生成 BlobContainerClient 对象。
- 为 EventProcessorClient 对象的 ProcessEventAsync 和 ProcessErrorAsync 事件指定处理程序。
- 通过对 EventProcessorClient 对象调用 StartProcessingAsync,开始处理事件。
- 通过对 EventProcessorClient 对象调用 StopProcessingAsync,在 30 秒后停止处理事件。
using Azure.Identity; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; using System.Text; // Create a blob container client that the event processor will use // TODO: Replace <STORAGE_ACCOUNT_NAME> and <BLOB_CONTATINAER_NAME> with actual names BlobContainerClient storageClient = new BlobContainerClient( new Uri("https://<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn/<BLOB_CONTAINER_NAME>"), new DefaultAzureCredential()); // Create an event processor client to process events in the event hub // TODO: Replace the <EVENT_HUBS_NAMESPACE> and <HUB_NAME> placeholder values var processor = new EventProcessorClient( storageClient, EventHubConsumerClient.DefaultConsumerGroupName, "<EVENT_HUB_NAMESPACE>.servicebus.chinacloudapi.cn", "<HUB_NAME>", new DefaultAzureCredential()); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); Console.ReadLine(); return Task.CompletedTask; } Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); Console.ReadLine(); return Task.CompletedTask; }
生成项目并确保没有错误。
注意
有关包含更详细注释的完整源代码,请参阅 GitHub 上的此文件。
运行接收器应用程序。
应会看到一条消息,指出已收到事件。 看到收到的事件消息后按 Enter。
Received event: Event 1 Received event: Event 2 Received event: Event 3
这些事件是前面通过运行发送器程序发送到事件中心的三个事件。
在 Azure 门户,可以验证事件中心向接收应用程序发送的三封传出的邮件。 刷新页面以更新图表。 可能需要在几秒钟后才会显示已收到消息。
示例和参考
本快速入门分步介绍了如何实现场景:向事件中心发送一批事件,然后接收这些事件。 有关更多示例,请选择以下链接。
有关完整的 .NET 库参考,请参阅 SDK 文档。
清理资源
删除具有事件中心命名空间的资源组,或仅删除命名空间(如果要保留资源组)。
相关内容
参阅以下教程: