使用 C# 为 Azure 数据资源管理器创建事件网格数据连接
Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。 Azure 数据资源管理器提供了从事件中心、IoT 中心和写入 blob 容器的 blob 引入数据(数据加载)的功能。
在本文中,你将使用 C# 为 Azure 数据资源管理器创建事件网格数据连接。
必备条件
- Visual Studio 2019,下载并使用免费的 Visual Studio 2019 Community Edition。 在安装 Visual Studio 的过程中,请确保启用“Azure 开发”。
- Azure 订阅。 创建 Azure 帐户。
- 创建群集和数据库。
- 创建表和列映射
- 设置数据库和表策略(可选)
- 创建包含事件网格订阅的存储帐户。
安装 C# NuGet
身份验证
若要运行以下示例,需要可以访问资源的 Azure Active Directory (Azure AD) 应用程序和服务主体。 若要创建免费的 Azure AD 应用程序并在订阅级别添加角色分配,请参阅创建 Azure AD 应用程序。 还需要目录(租户)ID、应用程序 ID 和客户端密码。
添加事件网格数据连接
以下示例演示如何以编程方式添加事件网格数据连接。 请参阅在 Azure 数据资源管理器中创建事件网格数据连接,以使用 Azure 门户添加事件网格数据连接。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "PlaceholderClientSecret";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.chinacloudapi.cn/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.chinacloudapi.cn/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The event hub and storage account that are created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var storageAccountResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.Storage/storageAccounts/xxxxxx";
var consumerGroup = "$Default";
var location = "China East 2";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var blobStorageEventType = "Microsoft.Storage.BlobCreated";
var databaseRouting = "Multi";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventGridDataConnection(storageAccountResourceId, eventHubResourceId, consumerGroup, tableName: tableName, location: location, mappingRuleName: mappingRuleName, dataFormat: dataFormat, blobStorageEventType: blobStorageEventType, databaseRouting: databaseRouting));
设置 | 建议的值 | 字段说明 |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 租户 ID。 也称为目录 ID。 |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 用于创建资源的订阅 ID。 |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 可以访问租户中资源的应用程序的客户端 ID。 |
clientSecret | PlaceholderClientSecret | 可以访问租户中资源的应用程序的客户端密码。 |
resourceGroupName | testrg | 包含群集的资源组的名称。 |
clusterName | mykustocluster | 群集的名称。 |
databaseName | mykustodatabase | 群集中目标数据库的名称。 |
dataConnectionName | myeventhubconnect | 所需的数据连接名称。 |
tableName | StormEvents | 目标数据库中目标表的名称。 |
mappingRuleName | StormEvents_CSV_Mapping | 与目标表相关的列映射的名称。 |
dataFormat | csv | 消息的数据格式。 |
eventHubResourceId | 资源 ID | 将事件网格配置为发送事件的事件中心的资源 ID。 |
storageAccountResourceId | 资源 ID | 包含要引入数据的存储帐户的资源 ID。 |
consumerGroup | $Default | 事件中心的使用者组。 |
location | 中国东部 2 | 数据连接资源的位置。 |
blobStorageEventType | Microsoft.Storage.BlobCreated | 触发引入的事件类型。 支持的事件为:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。 仅 ADLSv2 存储支持 Blob 重命名。 |
databaseRouting | 多或单 | 连接的数据库路由。 如果将此值设置为“单”,数据连接将按“databaseName”设置中指定的那样路由到群集中的单个数据库。 如果将此值设置为“多”,可使用数据库引入属性重写默认目标数据库。 有关详细信息,请参阅事件路由。 |
生成示例数据
连接 Azure 数据资源管理器和存储帐户后,可以创建示例数据并将其上传到存储。
注意
Azure 数据资源管理器在引入后不会删除 blob。 使用 Azure Blob 存储生命周期管理 blob 删除,将 blob 保留三到五天。
使用 Azure Blob 存储 SDK 上传文件
下面的代码片段在存储帐户中创建新容器,将现有文件(作为 blob)上传到该容器,然后列出容器中的 blob。
var azureStorageAccountConnectionString=<storage_account_connection_string>;
var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;
// Creating the container
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();
// Set metadata and upload file to blob
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);
// List blobs
var blobs = container.ListBlobs();
使用 Azure Data Lake SDK 上传文件
使用 Data Lake Storage Gen2 时,可以使用 Azure Data Lake SDK 将文件上传到存储。 以下代码片段使用 Azure.Storage.Files.DataLake v12.5.0 在 Azure Data Lake Storage 中创建新的文件系统,并将具有元数据的本地文件上传到该文件系统。
var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var fileName = <file_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mapping_reference>;
var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.chinacloudapi.cn";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);
// Create the filesystem
var dataLakeFileSystemClient = dataLakeServiceClient.CreateFileSystem(fileSystemName).Value;
// Define metadata
IDictionary<String, String> metadata = new Dictionary<string, string>();
metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
metadata.Add("kustoIngestionMappingReference", mapping);
// Set uploading options
var uploadOptions = new DataLakeFileUploadOptions
{
Metadata = metadata,
Close = true // Note: The close option triggers the event being processed by the data connection
};
// Write to the file
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(fileName);
dataLakeFileClient.Upload(localFileName, uploadOptions);
注意
使用 Azure Data Lake SDK 上传文件时,文件创建过程将触发一个大小为 0 的事件网格事件,并且该事件会被 Azure 数据资源管理器忽略。 如果 Close 参数设置为“true”,文件刷新会触发另一个事件 。 此事件表示这是最后一次更新,且文件流已关闭。 此事件由事件网格数据连接进行处理。 在上述代码片段中,上传方法会在文件上传完成时触发刷新。 因此,必须定义设置为“true”的 Close 参数 。 有关刷新的详细信息,请参阅 Azure Data Lake 刷新方法。
使用 Azure Data Lake SDK 重命名文件
以下代码片段使用 Azure Data Lake SDK 重命名 ADLSv2 存储帐户中的 blob。
var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var sourceFilePath = <source_file_path>;
var destinationFilePath = <destination_file_path>;
var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.chinacloudapi.cn";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);
// Get a client to the the filesystem
var dataLakeFileSystemClient = dataLakeServiceClient.GetFileSystemClient(fileSystemName);
// Rename a file in the file system
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(sourceFilePath);
dataLakeFileClient.Rename(destinationFilePath);
注意
- 可以在 ADLSv2 中重命名目录,但不会触发“已重命名 blob”事件,也不会在目录中引入 blob。 若要在重命名后引入 blob,请直接重命名所需的 blob。
- 如果在创建数据连接时,或者在手动创建事件网格资源时将筛选器定义为跟踪特定主题,则这些筛选器将应用于目标文件路径。
清理资源
若要删除数据连接,请使用以下命令:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);