使用 C# 为 Azure 数据资源管理器创建事件网格数据连接

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。 Azure 数据资源管理器提供了从事件中心、IoT 中心和写入 blob 容器的 blob 引入数据(数据加载)的功能。

在本文中,你将使用 C# 为 Azure 数据资源管理器创建事件网格数据连接。

必备条件

安装 C# NuGet

身份验证

若要运行以下示例,需要可以访问资源的 Azure Active Directory (Azure AD) 应用程序和服务主体。 若要创建免费的 Azure AD 应用程序并在订阅级别添加角色分配,请参阅创建 Azure AD 应用程序。 还需要目录(租户)ID、应用程序 ID 和客户端密码。

添加事件网格数据连接

以下示例演示如何以编程方式添加事件网格数据连接。 请参阅在 Azure 数据资源管理器中创建事件网格数据连接,以使用 Azure 门户添加事件网格数据连接。

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "PlaceholderClientSecret";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.chinacloudapi.cn/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.chinacloudapi.cn/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The event hub and storage account that are created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var storageAccountResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.Storage/storageAccounts/xxxxxx";
var consumerGroup = "$Default";
var location = "China East 2";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var blobStorageEventType = "Microsoft.Storage.BlobCreated";
var databaseRouting = "Multi";

await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventGridDataConnection(storageAccountResourceId, eventHubResourceId, consumerGroup, tableName: tableName, location: location, mappingRuleName: mappingRuleName, dataFormat: dataFormat, blobStorageEventType: blobStorageEventType, databaseRouting: databaseRouting));
设置 建议的值 字段说明
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 租户 ID。 也称为目录 ID。
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 用于创建资源的订阅 ID。
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 可以访问租户中资源的应用程序的客户端 ID。
clientSecret PlaceholderClientSecret 可以访问租户中资源的应用程序的客户端密码。
resourceGroupName testrg 包含群集的资源组的名称。
clusterName mykustocluster 群集的名称。
databaseName mykustodatabase 群集中目标数据库的名称。
dataConnectionName myeventhubconnect 所需的数据连接名称。
tableName StormEvents 目标数据库中目标表的名称。
mappingRuleName StormEvents_CSV_Mapping 与目标表相关的列映射的名称。
dataFormat csv 消息的数据格式。
eventHubResourceId 资源 ID 将事件网格配置为发送事件的事件中心的资源 ID。
storageAccountResourceId 资源 ID 包含要引入数据的存储帐户的资源 ID。
consumerGroup $Default 事件中心的使用者组。
location 中国东部 2 数据连接资源的位置。
blobStorageEventType Microsoft.Storage.BlobCreated 触发引入的事件类型。 支持的事件为:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。 仅 ADLSv2 存储支持 Blob 重命名。
databaseRouting 多或单 连接的数据库路由。 如果将此值设置为“单”,数据连接将按“databaseName”设置中指定的那样路由到群集中的单个数据库。 如果将此值设置为“多”,可使用数据库引入属性重写默认目标数据库。 有关详细信息,请参阅事件路由

生成示例数据

连接 Azure 数据资源管理器和存储帐户后,可以创建示例数据并将其上传到存储。

注意

Azure 数据资源管理器在引入后不会删除 blob。 使用 Azure Blob 存储生命周期管理 blob 删除,将 blob 保留三到五天。

使用 Azure Blob 存储 SDK 上传文件

下面的代码片段在存储帐户中创建新容器,将现有文件(作为 blob)上传到该容器,然后列出容器中的 blob。

var azureStorageAccountConnectionString=<storage_account_connection_string>;

var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;

// Creating the container
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();

// Set metadata and upload file to blob
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);

// List blobs
var blobs = container.ListBlobs();

使用 Azure Data Lake SDK 上传文件

使用 Data Lake Storage Gen2 时,可以使用 Azure Data Lake SDK 将文件上传到存储。 以下代码片段使用 Azure.Storage.Files.DataLake v12.5.0 在 Azure Data Lake Storage 中创建新的文件系统,并将具有元数据的本地文件上传到该文件系统。

var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var fileName = <file_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mapping_reference>;

var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.chinacloudapi.cn";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);

// Create the filesystem
var dataLakeFileSystemClient = dataLakeServiceClient.CreateFileSystem(fileSystemName).Value;

// Define metadata
IDictionary<String, String> metadata = new Dictionary<string, string>();
metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
metadata.Add("kustoIngestionMappingReference", mapping);

// Set uploading options
var uploadOptions = new DataLakeFileUploadOptions
{
    Metadata = metadata,
    Close = true // Note: The close option triggers the event being processed by the data connection
};

// Write to the file
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(fileName);
dataLakeFileClient.Upload(localFileName, uploadOptions);

注意

使用 Azure Data Lake SDK 上传文件时,文件创建过程将触发一个大小为 0 的事件网格事件,并且该事件会被 Azure 数据资源管理器忽略。 如果 Close 参数设置为“true”,文件刷新会触发另一个事件 。 此事件表示这是最后一次更新,且文件流已关闭。 此事件由事件网格数据连接进行处理。 在上述代码片段中,上传方法会在文件上传完成时触发刷新。 因此,必须定义设置为“true”的 Close 参数 。 有关刷新的详细信息,请参阅 Azure Data Lake 刷新方法

使用 Azure Data Lake SDK 重命名文件

以下代码片段使用 Azure Data Lake SDK 重命名 ADLSv2 存储帐户中的 blob。

var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var sourceFilePath = <source_file_path>;
var destinationFilePath = <destination_file_path>;

var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.chinacloudapi.cn";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);

// Get a client to the the filesystem
var dataLakeFileSystemClient = dataLakeServiceClient.GetFileSystemClient(fileSystemName);

// Rename a file in the file system
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(sourceFilePath);
dataLakeFileClient.Rename(destinationFilePath);

注意

  • 可以在 ADLSv2 中重命名目录,但不会触发“已重命名 blob”事件,也不会在目录中引入 blob。 若要在重命名后引入 blob,请直接重命名所需的 blob。
  • 如果在创建数据连接时,或者在手动创建事件网格资源时将筛选器定义为跟踪特定主题,则这些筛选器将应用于目标文件路径。

清理资源

若要删除数据连接,请使用以下命令:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);