为 Azure 数据资源管理器创建事件网格数据连接

本文介绍如何使用事件网格数据连接将 blob 从存储帐户引入到 Azure 数据资源管理器。 你将创建一个用于设置 Azure 事件网格订阅的事件网格数据连接。 事件网格订阅通过 Azure 事件中心将事件从存储帐户路由到 Azure 数据资源管理器。

备注

引入支持的最大文件大小为 6 GB。 建议引入 100 MB 到 1 GB 的文件。

若要了解如何使用 Kusto SDK 创建连接,请参阅使用 SDK 创建事件网格数据连接

有关如何从事件网格引入 Azure 数据资源管理器的一般信息,请参阅连接到事件网格

备注

若要使用事件网格连接实现最佳性能,请通过 Blob 元数据设置 rawSizeBytes 引入属性。 有关详细信息,请参阅引入属性

先决条件

创建事件网格数据连接

在本部分,你需要在事件网格与 Azure 数据资源管理器表之间建立连接。

以下示例显示用于添加事件网格数据连接的 Azure 资源管理器模板。 可以使用此窗体在 Azure 门户中编辑和部署模板

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "namespaces_eventhubns_name": {
            "type": "string",
            "defaultValue": "eventhubns",
            "metadata": {
                "description": "Specifies the event hub namespace name."
            }
        },
        "EventHubs_eventhubdemo_name": {
            "type": "string",
            "defaultValue": "eventhubdemo",
            "metadata": {
                "description": "Specifies the event hub name."
            }
        },
        "consumergroup_default_name": {
            "type": "string",
            "defaultValue": "$Default",
            "metadata": {
                "description": "Specifies the consumer group of the event hub."
            }
        },
        "StorageAccounts_storagedemo_name": {
            "type": "string",
            "defaultValue": "storagedemo",
            "metadata": {
                "description": "Specifies the storage account name"
            }
        },
        "Clusters_kustocluster_name": {
            "type": "string",
            "defaultValue": "kustocluster",
            "metadata": {
                "description": "Specifies the name of the cluster"
            }
        },
        "databases_kustodb_name": {
            "type": "string",
            "defaultValue": "kustodb",
            "metadata": {
                "description": "Specifies the name of the database"
            }
        },
        "tables_kustotable_name": {
            "type": "string",
            "defaultValue": "kustotable",
            "metadata": {
                "description": "Specifies the name of the table"
            }
        },
        "mapping_kustomapping_name": {
            "type": "string",
            "defaultValue": "kustomapping",
            "metadata": {
                "description": "Specifies the name of the mapping rule"
            }
        },
        "dataformat_type": {
            "type": "string",
            "defaultValue": "csv",
            "metadata": {
                "description": "Specifies the data format"
            }
        },
             "databaseRouting_type": {
            "type": "string",
            "defaultValue": "Single",
            "metadata": {
                "description": "The database routing for the connection. If you set the value to **Single**, the data connection will be routed to a single database in the cluster as specified in the *databaseName* setting. If you set the value to **Multi**, you can override the default target database using the *Database* EventData property."
            }
        },
        "dataconnections_kustodc_name": {
            "type": "string",
            "defaultValue": "kustodc",
            "metadata": {
                "description": "Name of the data connection to create"
            }
        },
        "subscriptionId": {
            "type": "string",
            "defaultValue": "[subscription().subscriptionId]",
            "metadata": {
                "description": "Specifies the subscriptionId of the resources"
            }
        },
        "resourceGroup": {
            "type": "string",
            "defaultValue": "[resourceGroup().name]",
            "metadata": {
                "description": "Specifies the resourceGroup of the resources"
            }
        },
        "location": {
            "type": "string",
            "defaultValue": "[resourceGroup().location]",
            "metadata": {
                "description": "Location for all resources."
            }
        }
    },
    "variables": {
    },
    "resources": [{
            "type": "Microsoft.Kusto/Clusters/Databases/DataConnections",
            "apiVersion": "2022-02-01",
            "name": "[concat(parameters('Clusters_kustocluster_name'), '/', parameters('databases_kustodb_name'), '/', parameters('dataconnections_kustodc_name'))]",
            "location": "[parameters('location')]",
            "kind": "EventGrid",
            "properties": {
                "managedIdentityResourceId": "[resourceId('Microsoft.Kusto/clusters', parameters('clusters_kustocluster_name'))]",
                "storageAccountResourceId": "[resourceId(parameters('subscriptionId'), parameters('resourceGroup'), 'Microsoft.Storage/storageAccounts', parameters('StorageAccounts_storagedemo_name'))]",
                "eventHubResourceId": "[resourceId(parameters('subscriptionId'), parameters('resourceGroup'), 'Microsoft.EventHub/namespaces/eventhubs', parameters('namespaces_eventhubns_name'), parameters('EventHubs_eventhubdemo_name'))]",
                "consumerGroup": "[parameters('consumergroup_default_name')]",
                "tableName": "[parameters('tables_kustotable_name')]",
                "mappingRuleName": "[parameters('mapping_kustomapping_name')]",
                "dataFormat": "[parameters('dataformat_type')]",
                "databaseRouting": "[parameters('databaseRouting_type')]"
            }
        }
    ]
}

使用事件网格数据连接

本部分介绍如何在创建 Blob 或重命名 Blob 后,触发从 Azure Blob 存储或 Azure Data Lake Gen 2 到群集的引入。

根据用于上传 Blob 的存储 SDK 类型选择相关的选项卡。

以下代码示例使用 Azure Blob 存储 SDK 将文件上传到 Azure Blob 存储。 上传会触发事件网格数据连接,该连接将数据引入到 Azure 数据资源管理器中。

var azureStorageAccountConnectionString = <storage_account_connection_string>;
var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mapping_reference>;
// Create a new container if it not already exists.
var azureStorageAccount = new BlobServiceClient(azureStorageAccountConnectionString);
var container = azureStorageAccount.GetBlobContainerClient(containerName);
container.CreateIfNotExists();
// Define blob metadata and uploading options.
IDictionary<String, String> metadata = new Dictionary<string, string>();
metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
metadata.Add("kustoIngestionMappingReference", mapping);
var uploadOptions = new BlobUploadOptions
{
    Metadata = metadata,
};
// Upload the file.
var blob = container.GetBlobClient(blobName);
blob.Upload(localFileName, uploadOptions);

备注

Azure 数据资源管理器在引入后不会删除 blob。 使用 Azure Blob 存储生命周期管理 blob 删除,将 blob 保留三到五天。

备注

启用了分层命名空间功能的存储帐户不支持在 CopyBlob 操作后触发引入。

重要

我们强烈建议不要从自定义代码生成存储事件并将其发送到事件中心。 如果选择这样做,请确保生成的事件严格遵循相应的存储事件架构和 JSON 格式规范。

删除事件网格数据连接

若要在 Azure 门户中删除事件网格连接,请执行以下步骤:

  1. 转到你的群集。 在左侧菜单中选择“数据库”。 然后选择包含目标表的数据库。
  2. 从左侧菜单选择“数据连接”。 然后,选中相关的事件网格数据连接旁边的复选框。
  3. 从顶部菜单栏中选择“删除”。