以端到端方式将 blob 引入到 Azure 数据资源管理器中
Azure 数据资源管理器是一项快速且可缩放的数据探索服务,适用于日志和遥测数据。 本文提供了有关如何将数据从 Azure Blob 存储引入 Azure 数据资源管理器的端到端示例。
你将了解如何以编程方式创建资源组、存储帐户和容器、事件中心以及 Azure 数据资源管理器群集和数据库。 你还将了解如何以编程方式配置 Azure 数据资源管理器以从新存储帐户引入数据。
有关基于以前的 SDK 版本的代码示例,请参阅存档的文章。
先决条件
- Azure 订阅。 创建 Azure 帐户。
- 可以访问资源的 Microsoft Entra 应用程序和服务主体。 保存“目录(租户) ID”、“应用程序 ID”和“客户端机密”值。
安装包
本文包含 C# 和 Python 的示例。 选择首选语言的选项卡,并安装所需的包。
- 安装 Azure.ResourceManager.Kusto。
- 安装 Azure.ResourceManager.EventGrid。
- 安装 Azure.Storage.Blobs。
- 安装用于身份验证的 Azure.Identity。
Azure Resource Manager 模板
在本文中,你将使用 Azure 资源管理器 (ARM) 模板来创建资源组、存储帐户和容器、事件中心以及 Azure 数据资源管理器群集和数据库。 将以下内容保存在名为 template.json
的文件中。 使用此文件来运行代码示例。
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"eventHubNamespaceName": {
"type": "string",
"metadata": {
"description": "Specifies a the event hub Namespace name."
}
},
"eventHubName": {
"type": "string",
"metadata": {
"description": "Specifies a event hub name."
}
},
"storageAccountType": {
"type": "string",
"defaultValue": "Standard_LRS",
"allowedValues": ["Standard_LRS", "Standard_GRS", "Standard_ZRS", "Premium_LRS"],
"metadata": {
"description": "Storage Account type"
}
},
"storageAccountName": {
"type": "string",
"defaultValue": "[concat('storage', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the storage account to create"
}
},
"containerName": {
"type": "string",
"defaultValue": "[concat('storagecontainer', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the container in storage account to create"
}
},
"eventHubSku": {
"type": "string",
"allowedValues": ["Basic", "Standard"],
"defaultValue": "Standard",
"metadata": {
"description": "Specifies the messaging tier for service Bus namespace."
}
},
"kustoClusterName": {
"type": "string",
"defaultValue": "[concat('kusto', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the cluster to create"
}
},
"kustoDatabaseName": {
"type": "string",
"defaultValue": "kustodb",
"metadata": {
"description": "Name of the database to create"
}
},
"clusterPrincipalAssignmentName": {
"type": "string",
"defaultValue": "clusterPrincipalAssignment1",
"metadata": {
"description": "Specifies the name of the principal assignment"
}
},
"principalIdForCluster": {
"type": "string",
"metadata": {
"description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
}
},
"roleForClusterPrincipal": {
"type": "string",
"defaultValue": "AllDatabasesViewer",
"metadata": {
"description": "Specifies the cluster principal role. It can be 'AllDatabasesAdmin',
'AllDatabasesMonitor' or 'AllDatabasesViewer'"
}
},
"tenantIdForClusterPrincipal": {
"type": "string",
"metadata": {
"description": "Specifies the tenantId of the cluster principal"
}
},
"principalTypeForCluster": {
"type": "string",
"defaultValue": "App",
"metadata": {
"description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
}
},
"databasePrincipalAssignmentName": {
"type": "string",
"defaultValue": "databasePrincipalAssignment1",
"metadata": {
"description": "Specifies the name of the principal assignment"
}
},
"principalIdForDatabase": {
"type": "string",
"metadata": {
"description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
}
},
"roleForDatabasePrincipal": {
"type": "string",
"defaultValue": "Admin",
"metadata": {
"description": "Specifies the database principal role. It can be 'Admin', 'Ingestor', 'Monitor', 'User', 'UnrestrictedViewers', 'Viewer'"
}
},
"tenantIdForDatabasePrincipal": {
"type": "string",
"metadata": {
"description": "Specifies the tenantId of the database principal"
}
},
"principalTypeForDatabase": {
"type": "string",
"defaultValue": "App",
"metadata": {
"description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
}
},
"location": {
"type": "string",
"defaultValue": "[resourceGroup().location]",
"metadata": {
"description": "Location for all resources."
}
}
},
"variables": {
},
"resources": [{
"apiVersion": "2017-04-01",
"type": "Microsoft.EventHub/namespaces",
"name": "[parameters('eventHubNamespaceName')]",
"location": "[parameters('location')]",
"sku": {
"name": "[parameters('eventHubSku')]",
"tier": "[parameters('eventHubSku')]",
"capacity": 1
},
"properties": {
"isAutoInflateEnabled": false,
"maximumThroughputUnits": 0
}
}, {
"apiVersion": "2017-04-01",
"type": "Microsoft.EventHub/namespaces/eventhubs",
"name": "[concat(parameters('eventHubNamespaceName'), '/', parameters('eventHubName'))]",
"location": "[parameters('location')]",
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', parameters('eventHubNamespaceName'))]"],
"properties": {
"messageRetentionInDays": 7,
"partitionCount": 1
}
}, {
"type": "Microsoft.Storage/storageAccounts",
"name": "[parameters('storageAccountName')]",
"location": "[parameters('location')]",
"apiVersion": "2018-07-01",
"sku": {
"name": "[parameters('storageAccountType')]"
},
"kind": "StorageV2",
"resources": [
{
"name": "[concat('default/', parameters('containerName'))]",
"type": "blobServices/containers",
"apiVersion": "2018-07-01",
"dependsOn": [
"[parameters('storageAccountName')]"
],
"properties": {
"publicAccess": "None"
}
}
],
"properties": {}
}, {
"name": "[parameters('kustoClusterName')]",
"type": "Microsoft.Kusto/clusters",
"sku": {
"name": "Standard_D13_v2",
"tier": "Standard",
"capacity": 2
},
"apiVersion": "2019-09-07",
"location": "[parameters('location')]",
"tags": {
"Created By": "GitHub quickstart template"
}
}, {
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'))]",
"type": "Microsoft.Kusto/clusters/databases",
"apiVersion": "2019-09-07",
"location": "[parameters('location')]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
"properties": {
"softDeletePeriodInDays": 365,
"hotCachePeriodInDays": 31
}
}, {
"type": "Microsoft.Kusto/Clusters/principalAssignments",
"apiVersion": "2019-11-09",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('clusterPrincipalAssignmentName'))]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
"properties": {
"principalId": "[parameters('principalIdForCluster')]",
"role": "[parameters('roleForClusterPrincipal')]",
"tenantId": "[parameters('tenantIdForClusterPrincipal')]",
"principalType": "[parameters('principalTypeForCluster')]"
}
}, {
"type": "Microsoft.Kusto/Clusters/Databases/principalAssignments",
"apiVersion": "2019-11-09",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'), '/', parameters('databasePrincipalAssignmentName'))]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters/databases', parameters('kustoClusterName'), parameters('kustoDatabaseName'))]"],
"properties": {
"principalId": "[parameters('principalIdForDatabase')]",
"role": "[parameters('roleForDatabasePrincipal')]",
"tenantId": "[parameters('tenantIdForDatabasePrincipal')]",
"principalType": "[parameters('principalTypeForDatabase')]"
}
}
]
}
代码示例
下面的代码示例提供了一个分步过程,该过程导致将数据引入到 Azure 数据资源管理器中。
首先创建一个资源组。 还将创建 Azure 资源,例如存储帐户和容器、事件中心以及 Azure 数据资源管理器群集和数据库,并添加主体。 然后,在 Azure 数据资源管理器数据库中创建 Azure 事件网格订阅以及表和列映射。 最后,创建数据连接,将 Azure 数据资源管理器配置为从新存储帐户引入数据。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID
var clientSecret = "PlaceholderClientSecret"; //Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var credentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var deploymentName = "e2eexample";
Console.WriteLine("Step 1: Create a new resource group in your Azure subscription to manage all the resources for using Azure Data Explorer.");
var subscriptions = resourceManagementClient.GetSubscriptions();
var subscription = (await subscriptions.GetAsync(subscriptionId)).Value;
var resourceGroups = subscription.GetResourceGroups();
var resourceGroupName = deploymentName + "resourcegroup";
var location = AzureLocation.chinaeast2;
var resourceGroupData = new ResourceGroupData(location);
var resourceGroup = (await resourceGroups.CreateOrUpdateAsync(WaitUntil.Completed, resourceGroupName, resourceGroupData)).Value;
Console.WriteLine("Step 2: Create a Blob Storage, a container in the Storage account, an event hub, an Azure Data Explorer cluster, database, and add principals by using an Azure Resource Manager template.");
var deployments = resourceGroup.GetArmDeployments();
var azureResourceTemplatePath = @"xxxxxxxxx\template.json"; //Path to the Azure Resource Manager template JSON from the previous section
var eventHubName = deploymentName + "eventhub";
var eventHubNamespaceName = eventHubName + "ns";
var storageAccountName = deploymentName + "storage";
var storageContainerName = deploymentName + "storagecontainer";
var eventGridSubscriptionName = deploymentName + "eventgrid";
var kustoClusterName = deploymentName + "kustocluster";
var kustoDatabaseName = deploymentName + "kustodatabase";
var kustoTableName = "Events";
var kustoColumnMappingName = "Events_CSV_Mapping";
var kustoDataConnectionName = deploymentName + "kustoeventgridconnection";
var armDeploymentContent = new ArmDeploymentContent(
new ArmDeploymentProperties(ArmDeploymentMode.Incremental)
{
Template = BinaryData.FromString(File.ReadAllText(azureResourceTemplatePath, Encoding.UTF8)),
Parameters = BinaryData.FromObjectAsJson(
JsonConvert.SerializeObject(
new Dictionary<string, Dictionary<string, string>>
{
["eventHubNamespaceName"] = new(capacity: 1) { { "value", eventHubNamespaceName } },
["eventHubName"] = new(capacity: 1) { { "value", eventHubName } },
["storageAccountName"] = new(capacity: 1) { { "value", storageAccountName } },
["containerName"] = new(capacity: 1) { { "value", storageContainerName } },
["kustoClusterName"] = new(capacity: 1) { { "value", kustoClusterName } },
["kustoDatabaseName"] = new(capacity: 1) { { "value", kustoDatabaseName } },
["principalIdForCluster"] = new(capacity: 1) { { "value", "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" } }, //Application ID
["roleForClusterPrincipal"] = new(capacity: 1) { { "value", "AllDatabasesAdmin" } },
["tenantIdForClusterPrincipal"] = new(capacity: 1) { { "value", tenantId } },
["principalTypeForCluster"] = new(capacity: 1) { { "value", "App" } },
["principalIdForDatabase"] = new(capacity: 1) { { "value", "xxxxxxxx@xxxxxxxx.com" } }, //User Email
["roleForDatabasePrincipal"] = new(capacity: 1) { { "value", "Admin" } },
["tenantIdForDatabasePrincipal"] = new(capacity: 1) { { "value", tenantId } },
["principalTypeForDatabase"] = new(capacity: 1) { { "value", "User" } }
}
)
)
}
);
await deployments.CreateOrUpdateAsync(WaitUntil.Completed, deploymentName, armDeploymentContent);
Console.WriteLine("Step 3: Create an Event Grid subscription to publish blob events created in a specific container to an event hub.");
var storageResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Storage/storageAccounts/{storageAccountName}");
var eventHubResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{eventHubNamespaceName}/eventhubs/{eventHubName}");
var eventSubscriptions = resourceManagementClient.GetEventSubscriptions(storageResourceId);
var eventSubscriptionData = new EventGridSubscriptionData
{
Destination = new EventHubEventSubscriptionDestination { ResourceId = eventHubResourceId },
Filter = new EventSubscriptionFilter
{
SubjectBeginsWith = $"/blobServices/default/containers/{storageContainerName}",
}
};
eventSubscriptionData.Filter.IncludedEventTypes.Add(BlobStorageEventType.MicrosoftStorageBlobCreated.ToString());
await eventSubscriptions.CreateOrUpdateAsync(WaitUntil.Completed, eventGridSubscriptionName, eventSubscriptionData);
Console.WriteLine("Step 4: Create a table (with three columns: EventTime, EventId, and EventSummary) and column mapping in your Azure Data Explorer database.");
var kustoUri = $"https://{kustoClusterName}.{location}.kusto.chinacloudapi.cn";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
InitialCatalog = kustoDatabaseName,
FederatedSecurity = true,
ApplicationClientId = clientId,
ApplicationKey = clientSecret,
Authority = tenantId
};
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
kustoClient.ExecuteControlCommand(
CslCommandGenerator.GenerateTableCreateCommand(
kustoTableName,
new[]
{
Tuple.Create("EventTime", "System.DateTime"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("EventSummary", "System.String"),
}
)
);
kustoClient.ExecuteControlCommand(
CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
kustoTableName,
kustoColumnMappingName,
new ColumnMapping[]
{
new() { ColumnName = "EventTime", ColumnType = "dateTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EventId", ColumnType = "int", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EventSummary", ColumnType = "string", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
}
)
);
}
Console.WriteLine("Step 5: Add an Event Grid data connection. Azure Data Explorer will automatically ingest the data when new blobs are created.");
var cluster = (await resourceGroup.GetKustoClusterAsync(kustoClusterName)).Value;
var database = (await cluster.GetKustoDatabaseAsync(kustoDatabaseName)).Value;
var dataConnections = database.GetKustoDataConnections();
var eventGridDataConnectionData = new KustoEventGridDataConnection
{
StorageAccountResourceId = storageResourceId,
EventGridResourceId = eventHubResourceId,
ConsumerGroup = "$Default",
Location = location,
TableName = kustoTableName,
MappingRuleName = kustoColumnMappingName,
DataFormat = KustoEventGridDataFormat.Csv
};
await dataConnections.CreateOrUpdateAsync(WaitUntil.Completed, kustoDataConnectionName, eventGridDataConnectionData);
设置 | 字段说明 |
---|---|
tenantId | 租户 ID。 它也称为目录 ID。 |
subscriptionId | 用于创建资源的订阅 ID。 |
clientId | 可以访问租户中资源的应用程序的客户端 ID。 |
clientSecret | 可以访问租户中资源的应用程序的客户端密码。 |
测试代码示例
将文件上传到存储帐户。
var container = new BlobContainerClient( "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxxxxxxx;AccountKey=xxxxxxxxxxxxxx;EndpointSuffix=core.chinacloudapi.cn", storageContainerName ); var blobContent = "2007-01-01 00:00:00.0000000,2592,Several trees down\n2007-01-01 00:00:00.0000000,4171,Winter Storm"; await container.UploadBlobAsync("test.csv", BinaryData.FromString(blobContent));
设置 字段说明 storageConnectionString 以编程方式创建的存储帐户的连接字符串。 在 Azure 数据资源管理器中运行测试查询。
var kustoUri = $"https://{kustoClusterName}.{locationSmallCase}.kusto.chinacloudapi.cn"; var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri) { InitialCatalog = kustoDatabaseName, FederatedSecurity = true, ApplicationClientId = clientId, ApplicationKey = clientSecret, Authority = tenantId }; using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder)) { var query = $"{kustoTableName} | take 10"; using var reader = kustoClient.ExecuteQuery(query) as DataTableReader2; // Print the contents of each of the result sets. while (reader != null && reader.Read()) { Console.WriteLine($"{reader[0]}, {reader[1]}, {reader[2]}"); } }