使用事件中心和 .NET 发送并接收 Atlas Kafka 主题消息

本快速入门介绍如何发送和接收 Atlas Kafka 主题事件。 我们将使用 Azure 事件中心和 Azure.Messaging.EventHubs .NET 库。

先决条件

如果你不熟悉事件中心,请在阅读本快速入门之前参阅事件中心概述

若要遵循本快速入门,需要具备某些先决条件:

  • Azure 订阅。 若要使用 Azure 服务(包括事件中心),需要一个 Azure 订阅。 如果没有 Azure 帐户,可以注册试用帐户,也可以在创建帐户时使用 MSDN 订阅者权益。
  • Microsoft Visual Studio 2022。 事件中心客户端库利用 C# 8.0 中引入的新功能。 你仍可使用以前的 C# 版本的库,但新语法将不可用。 若要使用完整语法,建议使用 .NET Core SDK 3.0 或更高版本进行编译,并将语言版本 设置为 latest。 如果使用的 Visual Studio 版本是 Visual Studio 2019 以前的版本,则它不具有生成 C# 8.0 项目所需的工具。 可在此处下载 Visual Studio 2022(包括免费的 Community Edition)。
  • 活动的 Microsoft Purview 帐户
  • 使用 Microsoft Purview 帐户配置的用于发送和接收消息的事件中心
    • 你的帐户可能已配置。 可以在 Azure 门户的“设置”、“Kafka 配置”下检查 Microsoft Purview 帐户。 如果尚未配置,请按照本指南执行操作。

将消息发布到 Microsoft Purview

创建 .NET Core 控制台应用程序,用于通过事件中心 Kafka 主题 ATLAS_HOOK 向 Microsoft Purview 发送事件。

若要将消息发布到 Microsoft Purview,将需要一个托管事件中心,或至少一个具有挂钩配置的事件中心

创建 Visual Studio 项目

接下来,在 Visual Studio 中创建 C# .NET 控制台应用程序:

  1. 启动 Visual Studio
  2. 在“开始”窗口中,选择“创建新项目”>“控制台应用(.NET Framework)” 。 需要 .NET 4.5.2 或更高版本。
  3. 在“项目名称”中,输入“PurviewKafkaProducer”。
  4. 选择“创建”来创建项目。

创建控制台应用程序

  1. 启动 Visual Studio 2022。
  2. 选择“创建新项目”。
  3. 在“创建新项目”对话框中执行以下步骤:如果看不到此对话框,请在菜单中选择“文件”,然后依次选择“新建”、“项目”。
    1. 选择“C#”作为编程语言。
    2. 选择“控制台”作为应用程序类型。
    3. 从结果列表中选择“控制台应用(.NET Core)”。
    4. 然后,选择“下一步”。

添加事件中心 NuGet 包

  1. 在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”

  2. 运行以下命令以安装 Azure.Messaging.EventHubs NuGet 包和 Azure.Messaging.EventHubs.Producer NuGet 包:

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

编写用于将消息发送到事件中心的代码

  1. 在 Program.cs 文件顶部添加以下 using 语句:

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. 将事件中心连接字符串和事件中心名称的常量添加到 Program 类。

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. Main 方法替换为以下 async Main 方法,并添加 async ProduceMessage 以将消息推送到 Microsoft Purview。 有关详细信息,请参阅代码中的注释。

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. 生成项目。 请确保没有任何错误。

  5. 运行程序并等待出现确认消息。

    注意

    有关包含更详细注释的完整源代码,请参阅 GitHub 中的此文件

使用“创建实体 JSON”消息创建包含两个列的 sql 表的示例代码

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


接收 Microsoft Purview 消息

接下来介绍如何编写一个使用事件处理器从事件中心接收消息的 .NET Core 控制台应用程序。 事件处理器从事件中心管理持久检查点和并行接收。 这简化了接收事件的过程。 需要使用 ATLAS_ENTITIES 事件中心从 Microsoft Purview 接收消息。

若要从 Microsoft Purview 接收消息,将需要一个托管事件中心,或一个事件中心通知配置

警告

事件中心 SDK 使用可用的最新版本的存储 API。 该版本在 Stack Hub 平台上可能不可用。 如果在 Azure Stack Hub 上运行此代码,除非面向所使用的特定版本,否则会遇到运行时错误。 如果要将 Azure Blob 存储用作检查点存储,请查看 Azure Stack Hub 内部版本支持的 Azure 存储 API 版本,并在代码中面向此版本。

存储服务的最高可用版本是版本 2019-02-02。 默认情况下,事件中心 SDK 客户端库使用 Azure 上的最高可用版本(在 SDK 发布时为 2019-07-07)。 如果要使用 Azure Stack Hub 版本 2005,除了按照本部分中的步骤操作外,还需要添加面向存储服务 API 版本 2019-02-02 的代码。 若要了解如何面向特定的存储 API 版本,请参阅 GitHub 中的此示例

创建 Azure 存储和 Blob 容器

我们将使用 Azure 存储作为检查点存储。 使用以下步骤创建一个 Azure 存储帐户。

  1. 创建 Azure 存储帐户

  2. 创建一个 blob 容器

  3. 获取存储帐户的连接字符串

    请记下该连接字符串和容器名称。 稍后要在接收代码中使用这些信息。

为接收器创建 Visual Studio 项目

  1. 在“解决方案资源管理器”窗口中,选择并按住(右键单击)EventHubQuickStart 解决方案,指向“添加”,然后选择“新建项目” 。
  2. 依次选择“控制台应用(.NET Core)”、“下一步”。
  3. 输入 PurviewKafkaConsumer 作为“项目名称”,然后选择“创建”。

添加事件中心 NuGet 包

  1. 在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”

  2. 运行以下命令安装 Azure.Messaging.EventHubs NuGet 包:

    Install-Package Azure.Messaging.EventHubs
    
  3. 运行以下命令安装 Azure.Messaging.EventHubs.Processor NuGet 包:

    Install-Package Azure.Messaging.EventHubs.Processor
    

更新 Main 方法

  1. 在 Program.cs 文件顶部添加以下 using 语句。

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. 将事件中心连接字符串和事件中心名称的常量添加到 Program 类。 将括号中的占位符替换为创建事件中心和存储帐户时获取的实际值(访问密钥 - 主连接字符串)。 请确保 {Event Hubs namespace connection string} 是命名空间级别的连接字符串,而不是事件中心字符串。

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    将消息发送到 Microsoft Purview 时,使用 ATLAS_ENTITIES 作为事件中心名称。

  3. Main 方法替换为以下 async Main 方法。 有关详细信息,请参阅代码中的注释。

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. 现在,将以下事件和错误处理程序方法添加到类中。

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. 生成项目。 请确保没有任何错误。

    注意

    有关包含更详细注释的完整源代码,请参阅 GitHub 上的此文件

  6. 运行接收器应用程序。

从 Microsoft Purview 收到的消息示例

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

后续步骤

在 GitHub 中查看更多示例。