快速入门:在使用事件中心 .NET SDK (AMQP) 流式处理事件时使用 Avro 架构验证

本快速入门介绍如何使用 Azure.Messaging.EventHubs .NET 库通过架构验证向/从事件中心发送/接收事件

Azure 架构注册表 是事件中心的一项功能。 注册表为以事件驱动和以消息传送为中心的应用程序的架构提供中心存储库。 它使生成者和使用者应用程序可以灵活地交换数据,而无需管理和共享架构。 它还为可重用架构提供了一个简单的治理框架,并通过分组构造(架构组)定义了架构之间的关系。 有关详细信息,请参阅事件中心中的 Azure 架构注册表

先决条件

如果不熟悉 Azure 事件中心,请参阅 事件中心概述 ,然后再执行本快速入门。

若要完成本快速入门,需要满足以下先决条件:

  • 如果没有 Azure 订阅,请在开始之前创建 一个试用订阅

  • Microsoft Visual Studio 2022。

    Azure 事件中心客户端库使用 C# 8.0 中引入的功能。 仍然可以使用之前的 C# 语言版本的库,但新语法不可用。 若要使用完整语法,建议使用 .NET Core SDK 3.0 或更高版本进行编译,并将语言版本设置为 latest

    如果使用 Visual Studio,Visual Studio 2019 以前的版本与生成 C# 8.0 项目时所需的工具将不兼容。 要下载 Visual Studio 2019 或 Visual Studio 2022(包括免费社区版),请参阅 Visual Studio

创建事件中心

要创建事件中心命名空间和事件中心,请按照创建事件中心命名空间和事件中心的说明进行操作。

要获取事件中心命名空间的连接字符串,请按照获取连接字符串中的说明进行操作。

记下当前快速入门中要使用的以下设置:

  • 事件中心命名空间的连接字符串
  • 事件中心的名称

创建架构

要创建架构组和架构,请按照使用架构注册表创建架构中的说明进行操作。

  1. 使用架构注册表门户创建名为 contoso-sg 的架构组。 使用“Avro”作为序列化类型,使用“无”作为兼容性模式

  2. 在该架构组中,使用架构名称创建新的 Avro 架构:Microsoft.Azure.Data.SchemaRegistry.example.Order。 使用以下架构内容。

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

将用户添加到架构注册表读取者角色

将用户帐户添加到命名空间级别的 架构注册表读取者 角色。 还可以使用 Schema Registry Contributor 角色,但在此快速入门中不必这样做。

  1. 在“事件中心命名空间”页面上,在左侧菜单上选择“访问控制(IAM)”
  2. 在“访问控制(IAM)”页上,选择“+ 添加”“添加角色分配”>
  3. 在“角色”页上,选择“架构注册表读取者”,然后选择“下一步”
  4. 使用“+ 选择成员”链接将用户帐户添加到该角色,然后选择“下一步”。
  5. 在“查看 + 分配”页面上,选择“查看 + 分配”

使用架构验证向事件中心生成事件

为事件生成者创建控制台应用程序

  1. 启动 Visual Studio。

  2. 选择 “创建新项目”。

  3. 在“创建新项目”对话框中,执行以下步骤。 如果看不到此对话框,请在菜单上选择“文件”,然后依次选择“新建”、“项目”

    1. 选择“C#”作为编程语言。

    2. 选择“控制台”作为应用程序类型。

    3. 从结果列表中选择“控制台应用程序”。

    4. 然后选择下一步

      显示 Visual Studio“新建项目”对话框的屏幕截图。

  4. 输入“OrderProducer”作为项目名称,输入“SRQuickStart”作为解决方案名称,然后选择“确定”以创建项目

添加事件中心 NuGet 包

  1. 选择 工具>NuGet 包管理器>包管理器控制台

  2. 运行以下命令以安装 Azure.Messaging.EventHubs 和其他 NuGet 包。 按 ENTER 运行最新命令。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. 使用 Visual Studio 对生成者应用程序进行身份验证,以连接到由世纪互联运营的 Azure。 有关详细信息,请参阅适用于 .NET 的 Azure 标识客户端库

  4. 使用在命名空间级别作为 Schema Registry Reader 角色成员的用户帐户登录到 Azure。 有关架构注册表角色的信息,请参阅 Azure 基于角色的访问控制

使用 Avro 架构生成代码

  1. 使用创建架构时所用的内容来创建名为 Order.avsc 的文件。 将文件保存在项目或解决方案文件夹中。
  2. 使用此架构文件为 .NET 生成代码。 可以使用任何外部代码生成工具(例如 avrogen)生成代码。 例如,运行 avrogen -s .\Order.avsc . 以生成代码。
  3. 生成代码后,会在 Order.cs 文件夹中看到名为 \Microsoft\Azure\Data\SchemaRegistry\example 的文件。 对于此处的 Avro 架构,会在 Microsoft.Azure.Data.SchemaRegistry.example 命名空间中生成 C# 类型。
  4. Order.cs 文件添加到 OrderProducer 项目中。

编写代码以序列化事件并将其发送到事件中心

  1. 将以下代码添加到 Program.cs 文件。 参阅代码注释了解详细信息。 代码中的概要步骤如下:

    1. 创建可用于将事件发送到事件中心的生成者客户端。
    2. 创建可用于序列化和验证 Order 对象中的数据的架构注册表客户端。
    3. 使用生成的 Order 类型创建新的 Order 对象。
    4. 使用架构注册表客户端将 Order 对象序列化为 EventData
    5. 准备一批事件。
    6. 将事件数据添加到事件批。
    7. 使用生成者客户端将该批事件发送到事件中心。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.chinacloudapi.cn";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. 将以下占位符值替换为实际值。

    • EVENTHUBSNAMESPACECONNECTIONSTRING - 事件中心命名空间的连接字符串
    • EVENTHUBNAME - 事件中心的名称
    • EVENTHUBSNAMESPACENAME - 事件中心命名空间的名称
    • SCHEMAGROUPNAME - 架构组的名称
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.chinacloudapi.cn";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. 生成项目并确保没有错误。

  4. 运行程序并等待出现确认消息。

    A batch of 1 order has been published.
    
  5. 在 Azure 门户中,可以验证事件中心是否收到了事件。 切换到指标部分中的消息视图。 刷新页面以更新图表。 可能需要几秒钟才能显示已收到消息。

    Azure 门户页的图像,用于验证事件中心是否收到了事件。

通过架构验证使用来自事件中心的事件

本部分介绍如何编写一个 .NET Core 控制台应用程序,用于从事件中心接收事件,并使用架构注册表来反序列化事件数据。

其他先决条件

  • 创建要用于事件处理器的存储帐户。

创建使用者应用程序

  1. 在“解决方案资源管理器”窗口中,右键单击“SRQuickStart”解决方案,选择“添加”,然后选择“新建项目”
  2. 选择控制台应用程序,然后选择下一步
  3. 输入“OrderConsumer”作为项目名称,然后选择“创建”
  4. 在“解决方案资源管理器”窗口中,右键单击“OrderConsumer”并选择“设为启动项目”

添加事件中心 NuGet 包

  1. 选择 工具>NuGet 包管理器>包管理器控制台

  2. 在“包管理器控制台”窗口中,确认是否为“默认项目”选择了“OrderConsumer”。 如果不是,请使用下拉列表选择“OrderConsumer”

  3. 运行以下命令以安装所需的 NuGet 包。 按 ENTER 运行最新命令。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. 使用 Visual Studio 对生产者应用程序进行身份验证,以便连接到由世纪互联运营的 Azure,如适用于 .NET 的 Azure 身份验证客户端库所示。

  5. 使用在命名空间级别作为 Schema Registry Reader 角色成员的用户帐户登录到 Azure。 有关架构注册表角色的信息,请参阅 Azure 基于角色的访问控制

  6. 将你在创建生成者应用时生成的 Order.cs 文件添加到 OrderConsumer 项目。

  7. 右键单击 OrderConsumer 项目,然后选择“设为启动项目”

编写代码以接收事件并使用架构注册表反序列化这些事件

  1. 将以下代码添加到 Program.cs 文件。 参阅代码注释了解详细信息。 代码中的概要步骤如下:

    1. 创建可用于将事件发送到事件中心的使用者客户端。
    2. 为 Azure Blob 存储中的 Blob 容器创建 Blob 容器客户端。
    3. 创建事件处理器客户端,然后注册事件和错误处理程序。
    4. 在事件处理程序中,创建可用于将事件数据反序列化为 Order 对象的架构注册表客户端。
    5. 使用序列化程序将事件数据反序列化为 Order 对象。
    6. 打印有关收到的订单的信息。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.chinacloudapi.cn";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be used as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. 将以下占位符值替换为实际值。

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING - 事件中心命名空间的连接字符串
    • EVENTHUBNAME - 事件中心的名称
    • EVENTHUBSNAMESPACENAME - 事件中心命名空间的名称
    • SCHEMAGROUPNAME - 架构组的名称
    • AZURESTORAGECONNECTIONSTRING - Azure 存储帐户的连接字符串
    • BLOBCONTAINERNAME - Blob 容器的名称
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.chinacloudapi.cn";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. 生成项目并确保没有错误。

  4. 运行接收器应用程序。

  5. 应会看到一条消息,指出事件中心已收到事件。

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    这些事件是前面通过运行发送器程序发送到事件中心的三个事件。

Samples

请参阅适用于 .NET 的 Azure 架构注册表 Apache Avro 客户端库

清理资源

删除事件中心命名空间或删除包含命名空间的资源组。

后续步骤