使用 C# 为 Azure 数据资源管理器创建事件网格数据连接Create an Event Grid data connection for Azure Data Explorer by using C#

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Azure 数据资源管理器提供了从事件中心、IoT 中心和写入 blob 容器的 blob 引入数据(数据加载)的功能。Azure Data Explorer offers ingestion (data loading) from Event Hubs, IoT Hubs, and blobs written to blob containers.

在本文中,你将使用 C# 为 Azure 数据资源管理器创建事件网格数据连接。In this article, you create an Event Grid data connection for Azure Data Explorer by using C#.

必备条件Prerequisites

安装 C# NuGetInstall C# NuGet

AuthenticationAuthentication

若要运行以下示例,需要可以访问资源的 Azure Active Directory (Azure AD) 应用程序和服务主体。To run the following example, you need an Azure Active Directory (Azure AD) application and service principal that can access resources. 若要创建免费的 Azure AD 应用程序并在订阅级别添加角色分配,请参阅创建 Azure AD 应用程序To create a free Azure AD application and add role assignment at the subscription level, see Create an Azure AD application. 还需要目录(租户)ID、应用程序 ID 和客户端密码。You also need the directory (tenant) ID, application ID, and client secret.

添加事件网格数据连接Add an Event Grid data connection

以下示例演示如何以编程方式添加事件网格数据连接。The following example shows you how to add an Event Grid data connection programmatically. 请参阅在 Azure 数据资源管理器中创建事件网格数据连接,以使用 Azure 门户添加事件网格数据连接。See create an Event Grid data connection in Azure Data Explorer for adding an Event Grid data connection using the Azure portal.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.chinacloudapi.cn/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.chinacloudapi.cn/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The event hub and storage account that are created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var storageAccountResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.Storage/storageAccounts/xxxxxx";
var consumerGroup = "$Default";
var location = "China East 2";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var blobStorageEventType = "Microsoft.Storage.BlobCreated";

await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventGridDataConnection(storageAccountResourceId, eventHubResourceId, consumerGroup, tableName: tableName, location: location, mappingRuleName: mappingRuleName, dataFormat: dataFormat, blobStorageEventType: blobStorageEventType));
设置Setting 建议的值Suggested value 字段说明Field description
tenantIdtenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxxxxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 租户 ID。Your tenant ID. 也称为目录 ID。Also known as directory ID.
subscriptionIdsubscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxxxxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 用于创建资源的订阅 ID。The subscription ID that you use for resource creation.
clientIdclientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxxxxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 可以访问租户中资源的应用程序的客户端 ID。The client ID of the application that can access resources in your tenant.
clientSecretclientSecret xxxxxxxxxxxxxxxxxxxxxxxxxxxx 可以访问租户中资源的应用程序的客户端密码。The client secret of the application that can access resources in your tenant.
resourceGroupNameresourceGroupName testrgtestrg 包含群集的资源组的名称。The name of the resource group containing your cluster.
clusterNameclusterName mykustoclustermykustocluster 群集的名称。The name of your cluster.
databaseNamedatabaseName mykustodatabasemykustodatabase 群集中目标数据库的名称。The name of the target database in your cluster.
dataConnectionNamedataConnectionName myeventhubconnectmyeventhubconnect 所需的数据连接名称。The desired name of your data connection.
tableNametableName StormEventsStormEvents 目标数据库中目标表的名称。The name of the target table in the target database.
mappingRuleNamemappingRuleName StormEvents_CSV_MappingStormEvents_CSV_Mapping 与目标表相关的列映射的名称。The name of your column mapping related to the target table.
dataFormatdataFormat csvcsv 消息的数据格式。The data format of the message.
eventHubResourceIdeventHubResourceId 资源 IDResource ID 事件中心的资源 ID,其中将事件网格配置为发送事件。The resource ID of your Event Hub where the Event Grid is configured to send events.
storageAccountResourceIdstorageAccountResourceId 资源 IDResource ID 包含要引入数据的存储帐户的资源 ID。The resource ID of your storage account that holds the data for ingestion.
consumerGroupconsumerGroup $Default$Default 事件中心的使用者组。The consumer group of your Event Hub.
locationlocation 中国东部 2China East 2 数据连接资源的位置。The location of the data connection resource.
blobStorageEventTypeblobStorageEventType Microsoft.Storage.BlobCreatedMicrosoft.Storage.BlobCreated 触发引入的事件类型。The type of event that triggers ingestion. 支持的事件为:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。Supported events are: Microsoft.Storage.BlobCreated or Microsoft.Storage.BlobRenamed. 仅 ADLSv2 存储支持 Blob 重命名。Blob renaming is supported only for ADLSv2 storage.

生成示例数据Generate sample data

连接 Azure 数据资源管理器和存储帐户后,可以创建示例数据并将其上传到存储。Now that Azure Data Explorer and the storage account are connected, you can create a sample data and upload it to the storage.

备注

Azure 数据资源管理器在引入后不会删除 blob。Azure Data Explorer won't delete the blobs post ingestion. 使用 Azure Blob 存储生命周期管理 blob 删除,将 blob 保留三到五天。Retain the blobs for three to five days by using Azure Blob storage lifecycle to manage blob deletion.

使用 Azure Blob 存储 SDK 上传文件Upload file using Azure Blob Storage SDK

下面的代码片段在存储帐户中创建新容器,将现有文件(作为 blob)上传到该容器,然后列出容器中的 blob。The following code snippet creates a new container in your storage account, uploads an existing file (as a blob) to that container, and then lists the blobs in the container.

var azureStorageAccountConnectionString=<storage_account_connection_string>;

var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;

// Creating the container
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();

// Set metadata and upload file to blob
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);

// List blobs
var blobs = container.ListBlobs();

使用 Azure Data Lake SDK 上传文件Upload file using Azure Data Lake SDK

使用 Data Lake Storage Gen2 时,可以使用 Azure Data Lake SDK 将文件上传到存储。When working with Data Lake Storage Gen2, you can use Azure Data Lake SDK to upload files to storage. 以下代码片段使用 Azure.Storage.Files.DataLake v12.5.0 在 Azure Data Lake Storage 中创建新的文件系统,并将具有元数据的本地文件上传到该文件系统。The following code snippet uses Azure.Storage.Files.DataLake v12.5.0 to create a new filesystem in Azure Data Lake storage and to upload a local file with metadata to that filesystem.

var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var fileName = <file_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mapping_reference>;

var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.chinacloudapi.cn";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);

// Create the filesystem
var dataLakeFileSystemClient = dataLakeServiceClient.CreateFileSystem(fileSystemName).Value;

// Define metadata
IDictionary<String, String> metadata = new Dictionary<string, string>();
metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
metadata.Add("kustoIngestionMappingReference", mapping);

// Set uploading options
var uploadOptions = new DataLakeFileUploadOptions
{
    Metadata = metadata,
    Close = true // Note: The close option triggers the event being processed by the data connection
};

// Write to the file
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(fileName);
dataLakeFileClient.Upload(localFileName, uploadOptions);

备注

使用 Azure Data Lake SDK 上传文件时,文件创建过程将触发一个大小为 0 的事件网格事件,并且该事件会被 Azure 数据资源管理器忽略。When using the Azure Data Lake SDK to upload a file, file creation triggers an Event Grid event with size 0, and this event is ignored by Azure Data Explorer. 如果 Close 参数设置为“true”,文件刷新会触发另一个事件 。File flushing triggers another event if the Close parameter is set to true. 此事件表示这是最后一次更新,且文件流已关闭。This event indicates that this is the final update and the file stream has been closed. 此事件由事件网格数据连接进行处理。This event is processed by the Event Grid data connection. 在上述代码片段中,上传方法会在文件上传完成时触发刷新。In the code snippet above, the Upload method triggers flushing when the file upload is finished. 因此,必须定义设置为“true”的 Close 参数 。Therefore, a Close parameter set to true must be defined. 有关刷新的详细信息,请参阅 Azure Data Lake 刷新方法For more information about flushing, see Azure Data Lake flush method.

使用 Azure Data Lake SDK 重命名文件Rename file using Azure Data Lake SDK

以下代码片段使用 Azure Data Lake SDK 重命名 ADLSv2 存储帐户中的 blob。The following code snippet uses Azure Data Lake SDK to rename a blob in an ADLSv2 storage account.

var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var sourceFilePath = <source_file_path>;
var destinationFilePath = <destination_file_path>;

var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.chinacloudapi.cn";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);

// Get a client to the the filesystem
var dataLakeFileSystemClient = dataLakeServiceClient.GetFileSystemClient(fileSystemName);

// Rename a file in the file system
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(sourceFilePath);
dataLakeFileClient.Rename(destinationFilePath);

备注

  • 可以在 ADLSv2 中重命名目录,但不会触发“已重命名 blob”事件,也不会在目录中引入 blob。Directory renaming is possible in ADLSv2, but it doesn't trigger blob renamed events and ingestion of blobs inside the directory. 若要在重命名后引入 blob,请直接重命名所需的 blob。To ingest blobs following renaming, directly rename the desired blobs.
  • 如果在创建数据连接时,或者在手动创建事件网格资源时将筛选器定义为跟踪特定主题,则这些筛选器将应用于目标文件路径。If you defined filters to track specific subjects while creating the data connection or while creating Event Grid resources manually, these filters are applied on the destination file path.

清理资源Clean up resources

若要删除数据连接,请使用以下命令:To delete the data connection, use the following command:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);