以端到端方式将 blob 引入到 Azure 数据资源管理器中

Azure 数据资源管理器是一项快速且可缩放的数据探索服务,适用于日志和遥测数据。 本文提供了有关如何将数据从 Azure Blob 存储引入 Azure 数据资源管理器的端到端示例。

你将了解如何以编程方式创建资源组、存储帐户和容器、事件中心以及 Azure 数据资源管理器群集和数据库。 你还将了解如何以编程方式配置 Azure 数据资源管理器以从新存储帐户引入数据。

有关基于以前的 SDK 版本的代码示例,请参阅存档的文章

先决条件

安装包

本文包含 C# 和 Python 的示例。 选择首选语言的选项卡,并安装所需的包。

Azure Resource Manager 模板

在本文中,你将使用 Azure 资源管理器 (ARM) 模板来创建资源组、存储帐户和容器、事件中心以及 Azure 数据资源管理器群集和数据库。 将以下内容保存在名为 template.json 的文件中。 使用此文件来运行代码示例。

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "eventHubNamespaceName": {
            "type": "string",
            "metadata": {
                "description": "Specifies a the event hub Namespace name."
            }
        },
        "eventHubName": {
            "type": "string",
            "metadata": {
                "description": "Specifies a event hub name."
            }
        },
        "storageAccountType": {
            "type": "string",
            "defaultValue": "Standard_LRS",
            "allowedValues": ["Standard_LRS", "Standard_GRS", "Standard_ZRS", "Premium_LRS"],
            "metadata": {
                "description": "Storage Account type"
            }
        },
        "storageAccountName": {
            "type": "string",
            "defaultValue": "[concat('storage', uniqueString(resourceGroup().id))]",
            "metadata": {
                "description": "Name of the storage account to create"
            }
        },
        "containerName": {
            "type": "string",
            "defaultValue": "[concat('storagecontainer', uniqueString(resourceGroup().id))]",
            "metadata": {
                "description": "Name of the container in storage account to create"
            }
        },
        "eventHubSku": {
            "type": "string",
            "allowedValues": ["Basic", "Standard"],
            "defaultValue": "Standard",
            "metadata": {
                "description": "Specifies the messaging tier for service Bus namespace."
            }
        },
        "kustoClusterName": {
            "type": "string",
            "defaultValue": "[concat('kusto', uniqueString(resourceGroup().id))]",
            "metadata": {
                "description": "Name of the cluster to create"
            }
        },
        "kustoDatabaseName": {
            "type": "string",
            "defaultValue": "kustodb",
            "metadata": {
                "description": "Name of the database to create"
            }
        },
        "clusterPrincipalAssignmentName": {
            "type": "string",
            "defaultValue": "clusterPrincipalAssignment1",
            "metadata": {
                "description": "Specifies the name of the principal assignment"
            }
        },
        "principalIdForCluster": {
            "type": "string",
            "metadata": {
                "description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
            }
        },
        "roleForClusterPrincipal": {
            "type": "string",
            "defaultValue": "AllDatabasesViewer",
            "metadata": {
                "description": "Specifies the cluster principal role. It can be 'AllDatabasesAdmin',
                'AllDatabasesMonitor' or 'AllDatabasesViewer'"
            }
        },
        "tenantIdForClusterPrincipal": {
            "type": "string",
            "metadata": {
                "description": "Specifies the tenantId of the cluster principal"
            }
        },
        "principalTypeForCluster": {
            "type": "string",
            "defaultValue": "App",
            "metadata": {
                "description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
            }
        },
        "databasePrincipalAssignmentName": {
            "type": "string",
            "defaultValue": "databasePrincipalAssignment1",
            "metadata": {
                "description": "Specifies the name of the principal assignment"
            }
        },
        "principalIdForDatabase": {
            "type": "string",
            "metadata": {
                "description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
            }
        },
        "roleForDatabasePrincipal": {
            "type": "string",
            "defaultValue": "Admin",
            "metadata": {
                "description": "Specifies the database principal role. It can be 'Admin', 'Ingestor', 'Monitor', 'User', 'UnrestrictedViewers', 'Viewer'"
            }
        },
        "tenantIdForDatabasePrincipal": {
            "type": "string",
            "metadata": {
                "description": "Specifies the tenantId of the database principal"
            }
        },
        "principalTypeForDatabase": {
            "type": "string",
            "defaultValue": "App",
            "metadata": {
                "description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
            }
        },
        "location": {
            "type": "string",
            "defaultValue": "[resourceGroup().location]",
            "metadata": {
                "description": "Location for all resources."
            }
        }
    },
    "variables": {
    },
    "resources": [{
            "apiVersion": "2017-04-01",
            "type": "Microsoft.EventHub/namespaces",
            "name": "[parameters('eventHubNamespaceName')]",
            "location": "[parameters('location')]",
            "sku": {
                "name": "[parameters('eventHubSku')]",
                "tier": "[parameters('eventHubSku')]",
                "capacity": 1
            },
            "properties": {
                "isAutoInflateEnabled": false,
                "maximumThroughputUnits": 0
            }
        }, {
            "apiVersion": "2017-04-01",
            "type": "Microsoft.EventHub/namespaces/eventhubs",
            "name": "[concat(parameters('eventHubNamespaceName'), '/', parameters('eventHubName'))]",
            "location": "[parameters('location')]",
            "dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', parameters('eventHubNamespaceName'))]"],
            "properties": {
                "messageRetentionInDays": 7,
                "partitionCount": 1
            }
        }, {
            "type": "Microsoft.Storage/storageAccounts",
            "name": "[parameters('storageAccountName')]",
            "location": "[parameters('location')]",
            "apiVersion": "2018-07-01",
            "sku": {
                "name": "[parameters('storageAccountType')]"
            },
            "kind": "StorageV2",
            "resources": [
                {
                    "name": "[concat('default/', parameters('containerName'))]",
                    "type": "blobServices/containers",
                    "apiVersion": "2018-07-01",
                    "dependsOn": [
                        "[parameters('storageAccountName')]"
                    ],
                    "properties": {
                        "publicAccess": "None"
                    }
                }
            ],
            "properties": {}
        }, {
            "name": "[parameters('kustoClusterName')]",
            "type": "Microsoft.Kusto/clusters",
            "sku": {
                "name": "Standard_D13_v2",
                "tier": "Standard",
                "capacity": 2
            },
            "apiVersion": "2019-09-07",
            "location": "[parameters('location')]",
            "tags": {
                "Created By": "GitHub quickstart template"
            }
        }, {
            "name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'))]",
            "type": "Microsoft.Kusto/clusters/databases",
            "apiVersion": "2019-09-07",
            "location": "[parameters('location')]",
            "dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
            "properties": {
                "softDeletePeriodInDays": 365,
                "hotCachePeriodInDays": 31
            }
        }, {
            "type": "Microsoft.Kusto/Clusters/principalAssignments",
            "apiVersion": "2019-11-09",
            "name": "[concat(parameters('kustoClusterName'), '/', parameters('clusterPrincipalAssignmentName'))]",
            "dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
            "properties": {
                "principalId": "[parameters('principalIdForCluster')]",
                "role": "[parameters('roleForClusterPrincipal')]",
                "tenantId": "[parameters('tenantIdForClusterPrincipal')]",
                "principalType": "[parameters('principalTypeForCluster')]"
            }
        }, {
            "type": "Microsoft.Kusto/Clusters/Databases/principalAssignments",
            "apiVersion": "2019-11-09",
            "name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'), '/', parameters('databasePrincipalAssignmentName'))]",
            "dependsOn": ["[resourceId('Microsoft.Kusto/clusters/databases', parameters('kustoClusterName'), parameters('kustoDatabaseName'))]"],
            "properties": {
                "principalId": "[parameters('principalIdForDatabase')]",
                "role": "[parameters('roleForDatabasePrincipal')]",
                "tenantId": "[parameters('tenantIdForDatabasePrincipal')]",
                "principalType": "[parameters('principalTypeForDatabase')]"
            }
        }
    ]
}

代码示例

下面的代码示例提供了一个分步过程,该过程导致将数据引入到 Azure 数据资源管理器中。

首先创建一个资源组。 还将创建 Azure 资源,例如存储帐户和容器、事件中心以及 Azure 数据资源管理器群集和数据库,并添加主体。 然后,在 Azure 数据资源管理器数据库中创建 Azure 事件网格订阅以及表和列映射。 最后,创建数据连接,将 Azure 数据资源管理器配置为从新存储帐户引入数据。

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID
var clientSecret = "PlaceholderClientSecret"; //Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var credentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var deploymentName = "e2eexample";
Console.WriteLine("Step 1: Create a new resource group in your Azure subscription to manage all the resources for using Azure Data Explorer.");
var subscriptions = resourceManagementClient.GetSubscriptions();
var subscription = (await subscriptions.GetAsync(subscriptionId)).Value;
var resourceGroups = subscription.GetResourceGroups();
var resourceGroupName = deploymentName + "resourcegroup";
var location = AzureLocation.chinaeast2;
var resourceGroupData = new ResourceGroupData(location);
var resourceGroup = (await resourceGroups.CreateOrUpdateAsync(WaitUntil.Completed, resourceGroupName, resourceGroupData)).Value;
Console.WriteLine("Step 2: Create a Blob Storage, a container in the Storage account, an event hub, an Azure Data Explorer cluster, database, and add principals by using an Azure Resource Manager template.");
var deployments = resourceGroup.GetArmDeployments();
var azureResourceTemplatePath = @"xxxxxxxxx\template.json"; //Path to the Azure Resource Manager template JSON from the previous section
var eventHubName = deploymentName + "eventhub";
var eventHubNamespaceName = eventHubName + "ns";
var storageAccountName = deploymentName + "storage";
var storageContainerName = deploymentName + "storagecontainer";
var eventGridSubscriptionName = deploymentName + "eventgrid";
var kustoClusterName = deploymentName + "kustocluster";
var kustoDatabaseName = deploymentName + "kustodatabase";
var kustoTableName = "Events";
var kustoColumnMappingName = "Events_CSV_Mapping";
var kustoDataConnectionName = deploymentName + "kustoeventgridconnection";
var armDeploymentContent = new ArmDeploymentContent(
    new ArmDeploymentProperties(ArmDeploymentMode.Incremental)
    {
        Template = BinaryData.FromString(File.ReadAllText(azureResourceTemplatePath, Encoding.UTF8)),
        Parameters = BinaryData.FromObjectAsJson(
            JsonConvert.SerializeObject(
                new Dictionary<string, Dictionary<string, string>>
                {
                    ["eventHubNamespaceName"] = new(capacity: 1) { { "value", eventHubNamespaceName } },
                    ["eventHubName"] = new(capacity: 1) { { "value", eventHubName } },
                    ["storageAccountName"] = new(capacity: 1) { { "value", storageAccountName } },
                    ["containerName"] = new(capacity: 1) { { "value", storageContainerName } },
                    ["kustoClusterName"] = new(capacity: 1) { { "value", kustoClusterName } },
                    ["kustoDatabaseName"] = new(capacity: 1) { { "value", kustoDatabaseName } },
                    ["principalIdForCluster"] = new(capacity: 1) { { "value", "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" } }, //Application ID
                    ["roleForClusterPrincipal"] = new(capacity: 1) { { "value", "AllDatabasesAdmin" } },
                    ["tenantIdForClusterPrincipal"] = new(capacity: 1) { { "value", tenantId } },
                    ["principalTypeForCluster"] = new(capacity: 1) { { "value", "App" } },
                    ["principalIdForDatabase"] = new(capacity: 1) { { "value", "xxxxxxxx@xxxxxxxx.com" } }, //User Email
                    ["roleForDatabasePrincipal"] = new(capacity: 1) { { "value", "Admin" } },
                    ["tenantIdForDatabasePrincipal"] = new(capacity: 1) { { "value", tenantId } },
                    ["principalTypeForDatabase"] = new(capacity: 1) { { "value", "User" } }
                }
            )
        )
    }
);
await deployments.CreateOrUpdateAsync(WaitUntil.Completed, deploymentName, armDeploymentContent);
Console.WriteLine("Step 3: Create an Event Grid subscription to publish blob events created in a specific container to an event hub.");
var storageResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Storage/storageAccounts/{storageAccountName}");
var eventHubResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{eventHubNamespaceName}/eventhubs/{eventHubName}");
var eventSubscriptions = resourceManagementClient.GetEventSubscriptions(storageResourceId);
var eventSubscriptionData = new EventGridSubscriptionData
{
    Destination = new EventHubEventSubscriptionDestination { ResourceId = eventHubResourceId },
    Filter = new EventSubscriptionFilter
    {
        SubjectBeginsWith = $"/blobServices/default/containers/{storageContainerName}",
    }
};
eventSubscriptionData.Filter.IncludedEventTypes.Add(BlobStorageEventType.MicrosoftStorageBlobCreated.ToString());
await eventSubscriptions.CreateOrUpdateAsync(WaitUntil.Completed, eventGridSubscriptionName, eventSubscriptionData);
Console.WriteLine("Step 4: Create a table (with three columns: EventTime, EventId, and EventSummary) and column mapping in your Azure Data Explorer database.");
var kustoUri = $"https://{kustoClusterName}.{location}.kusto.chinacloudapi.cn";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
    InitialCatalog = kustoDatabaseName,
    FederatedSecurity = true,
    ApplicationClientId = clientId,
    ApplicationKey = clientSecret,
    Authority = tenantId
};
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    kustoClient.ExecuteControlCommand(
        CslCommandGenerator.GenerateTableCreateCommand(
            kustoTableName,
            new[]
            {
                Tuple.Create("EventTime", "System.DateTime"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("EventSummary", "System.String"),
            }
        )
    );
    kustoClient.ExecuteControlCommand(
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            IngestionMappingKind.Csv,
            kustoTableName,
            kustoColumnMappingName,
            new ColumnMapping[]
            {
                new() { ColumnName = "EventTime", ColumnType = "dateTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
                new() { ColumnName = "EventId", ColumnType = "int", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
                new() { ColumnName = "EventSummary", ColumnType = "string", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            }
        )
    );
}
Console.WriteLine("Step 5: Add an Event Grid data connection. Azure Data Explorer will automatically ingest the data when new blobs are created.");
var cluster = (await resourceGroup.GetKustoClusterAsync(kustoClusterName)).Value;
var database = (await cluster.GetKustoDatabaseAsync(kustoDatabaseName)).Value;
var dataConnections = database.GetKustoDataConnections();
var eventGridDataConnectionData = new KustoEventGridDataConnection
{
    StorageAccountResourceId = storageResourceId,
    EventGridResourceId = eventHubResourceId,
    ConsumerGroup = "$Default",
    Location = location,
    TableName = kustoTableName,
    MappingRuleName = kustoColumnMappingName,
    DataFormat = KustoEventGridDataFormat.Csv
};
await dataConnections.CreateOrUpdateAsync(WaitUntil.Completed, kustoDataConnectionName, eventGridDataConnectionData);
设置 字段说明
tenantId 租户 ID。 它也称为目录 ID。
subscriptionId 用于创建资源的订阅 ID。
clientId 可以访问租户中资源的应用程序的客户端 ID。
clientSecret 可以访问租户中资源的应用程序的客户端密码。

测试代码示例

  1. 将文件上传到存储帐户。

    var container = new BlobContainerClient(
        "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxxxxxxx;AccountKey=xxxxxxxxxxxxxx;EndpointSuffix=core.chinacloudapi.cn",
        storageContainerName
    );
    var blobContent = "2007-01-01 00:00:00.0000000,2592,Several trees down\n2007-01-01 00:00:00.0000000,4171,Winter Storm";
    await container.UploadBlobAsync("test.csv", BinaryData.FromString(blobContent));
    
    设置 字段说明
    storageConnectionString 以编程方式创建的存储帐户的连接字符串。
  2. 在 Azure 数据资源管理器中运行测试查询。

    var kustoUri = $"https://{kustoClusterName}.{locationSmallCase}.kusto.chinacloudapi.cn";
    var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
    {
        InitialCatalog = kustoDatabaseName,
        FederatedSecurity = true,
        ApplicationClientId = clientId,
        ApplicationKey = clientSecret,
        Authority = tenantId
    };
    using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder))
    {
        var query = $"{kustoTableName} | take 10";
    
        using var reader = kustoClient.ExecuteQuery(query) as DataTableReader2;
        // Print the contents of each of the result sets. 
        while (reader != null && reader.Read())
        {
            Console.WriteLine($"{reader[0]}, {reader[1]}, {reader[2]}");
        }
    }