将 Azure 数据资源管理器与 Azure 数据工厂集成Integrate Azure Data Explorer with Azure Data Factory

Azure 数据工厂 (ADF) 是基于云的数据集成服务,可用于集成不同的数据存储,以及对数据执行活动。Azure Data Factory (ADF) is a cloud-based data integration service that allows you to integrate different data stores and perform activities on the data. 使用 ADF 可以创建数据驱动式工作流用于协调和自动化数据移动与数据转换。ADF allows you to create data-driven workflows for orchestrating and automating data movement and data transformation. Azure 数据资源管理器是 Azure 数据工厂中支持的数据存储之一。Azure Data Explorer is one of the supported data stores in Azure Data Factory.

Azure 数据资源管理器的 Azure 数据工厂活动Azure Data Factory activities for Azure Data Explorer

Azure 数据资源管理器用户可以使用 Azure 数据工厂的各种集成:Various integrations with Azure Data Factory are available for Azure Data Explorer users:

复制活动Copy activity

Azure 数据工厂复制活动用于在数据存储之间传输数据。Azure Data Factory Copy activity is used to transfer data between data stores. 支持将 Azure 数据资源管理器用作源,在这种情况下,数据将从 Azure 数据资源管理器复制到任何受支持的数据存储;也可以将它用作接收器,在这种情况下,数据将从任何受支持的数据存储复制到 Azure 数据资源管理器。Azure Data Explorer is supported as a source, where data is copied from Azure Data Explorer to any supported data store, and a sink, where data is copied from any supported data store to Azure Data Explorer. 有关详细信息,请参阅使用 Azure 数据工厂向/从 Azure 数据资源管理器复制数据For more information, see copy data to or from Azure Data Explorer using Azure Data Factory. 有关详细演练,请参阅将数据从 Azure 数据工厂载入 Azure 数据资源管理器and for a detailed walk-through see load data from Azure Data Factory into Azure Data Explorer. Azure IR (Integration Runtime) 支持使用 Azure 数据资源管理器在 Azure 内部复制数据,自承载 IR 支持使用 Azure 数据资源管理器从/向本地或配置了访问控制的网络(例如 Azure 虚拟网络)中的数据存储复制数据。Azure Data Explorer is supported by Azure IR (Integration Runtime), used when data is copied within Azure, and self-hosted IR, used when copying data from/to data stores located on-premises or in a network with access control, such as an Azure Virtual Network. 有关详细信息,请参阅要使用哪个 IRFor more information, see which IR to use

提示

使用复制活动以及创建链接服务数据集时,请选择数据存储“Azure 数据资源管理器(Kusto)”,而不要选择旧数据存储“Kusto”。 When using the copy activity and creating a Linked Service or a Dataset, select the data store Azure Data Explorer (Kusto) and not the old data store Kusto.

查找活动Lookup activity

查找活动用于在 Azure 数据资源管理器中执行查询。The Lookup activity is used for executing queries on Azure Data Explorer. 查询结果将作为查找活动的输出返回,可以在管道中的下一个活动中使用,具体如 ADF 查找文档中所述。The result of the query will be returned as the output of the Lookup activity, and can be used in the next activity in the pipeline as described in the ADF Lookup documentation.
除了将响应大小限制为 5,000 行和 2 MB 以外,该活动还将查询超时限制为 1 小时。In addition to the response size limit of 5,000 rows and 2 MB, the activity also has a query timeout limit of 1 hour.

命令活动Command activity

使用命令活动可以执行 Azure 数据资源管理器控制命令The Command activity allows the execution of Azure Data Explorer control commands. 与查询不同,控制命令可能会修改数据或元数据。Unlike queries, the control commands can potentially modify data or metadata. 某些控制命令旨在通过 .ingest.set-or-append 等命令将数据引入 Azure 数据资源管理器,或通过 .export 等命令将数据从 Azure 数据资源管理器复制到外部数据存储。Some of the control commands are targeted to ingest data into Azure Data Explorer, using commands such as .ingestor .set-or-append) or copy data from Azure Data Explorer to external data stores using commands such as .export. 有关命令活动的详细演练,请参阅使用 Azure 数据工厂命令活动运行 Azure 数据资源管理器控制命令For a detailed walk-through of the command activity, see use Azure Data Factory command activity to run Azure Data Explorer control commands. 使用控制命令复制数据有时比使用复制活动更快且更节省。Using a control command to copy data can, at times, be a faster and cheaper option than the Copy activity. 若要确定何时使用命令活动或复制活动,请参阅复制数据时在复制活动与命令活动之间进行选择To determine when to use the Command activity versus the Copy activity, see select between Copy and Command activities when copying data.

从数据库模板批量复制Copy in bulk from a database template

使用 Azure 数据工厂模板从数据库批量复制到 Azure 数据资源管理器是一个预定义的 Azure 数据工厂管道。The Copy in bulk from a database to Azure Data Explorer by using the Azure Data Factory template is a predefined Azure Data Factory pipeline. 使用该模板可针对每个数据库或每个表创建多个管道,以更快地复制数据。The template is used to create many pipelines per database or per table for faster data copying.

复制数据时在复制活动与 Azure 数据资源管理器命令活动之间进行选择Select between Copy and Azure Data Explorer Command activities when copy data

本部分帮助你根据数据复制需求选择正确的活动。This section will assist you in selecting the correct activity for your data copying needs.

从/向 Azure 数据资源管理器复制数据时,可以使用 Azure 数据工厂中的两个选项:When copying data from or to Azure Data Explorer, there are two available options in Azure Data Factory:

  • 复制活动。Copy activity.
  • Azure 数据资源管理器命令活动,该活动执行某个控制命令来传输 Azure 数据资源管理器中的数据。Azure Data Explorer Command activity, which executes one of the control commands that transfer data in Azure Data Explorer.

从 Azure 数据资源管理器复制数据Copy data from Azure Data Explorer

可以使用复制活动或 .export 命令从 Azure 数据资源管理器复制数据。You can copy data from Azure Data Explorer using the copy activity or the .export command. .export 命令执行某个查询,然后导出该查询的结果。The .export command executes a query, and then exports the results of the query.

下表比较了用于从 Azure 数据资源管理器复制数据的复制活动和 .export 命令。See the following table for a comparison of the Copy activity and .export command for copying data from Azure Data Explorer.

复制活动Copy activity .export 命令.export command
流程说明Flow description ADF 对 Kusto 执行查询,处理结果,然后将结果发送到目标数据存储。ADF executes a query on Kusto, processes the result, and sends it to the target data store.
ADX > ADF > 接收器数据存储(ADX > ADF > sink data store)
ADF 将 .export 控制命令发送到 Azure 数据资源管理器,后者执行该命令,然后将数据直接发送到目标数据存储。ADF sends an .export control command to Azure Data Explorer, which executes the command, and sends the data directly to the target data store.
ADX > 接收器数据存储(ADX > sink data store)
支持的目标数据存储Supported target data stores 有多种支持的数据存储A wide variety of supported data stores ADLSv2、Azure Blob、SQL 数据库ADLSv2, Azure Blob, SQL Database
“性能”Performance 集中式Centralized
  • 分布式(默认配置),从多个节点同时导出数据Distributed (default), exporting data from multiple nodes concurrently
  • 速度更快,COGS 成本效益更高。Faster and COGS efficient.
服务器限制Server limits 可以提高/禁用查询限制Query limits can be extended/disabled. 默认情况下,ADF 查询包含:By default, ADF queries contain:
  • 500,000 条记录或 64 MB 的大小限制。Size limit of 500,000 records or 64 MB.
  • 10 分钟时间限制。Time limit of 10 minutes.
  • noTruncation 设置为 false。noTruncation set to false.
默认情况下,可提高或禁用查询限制:By default, extends or disables the query limits:
  • 禁用大小限制。Size limits are disabled.
  • 将服务器超时提高至 1 小时。Server timeout is extended to 1 hour.
  • MaxMemoryConsumptionPerIteratorMaxMemoryConsumptionPerQueryPerNode 提高至最大值(5 GB,TotalPhysicalMemory/2)。MaxMemoryConsumptionPerIterator and MaxMemoryConsumptionPerQueryPerNode are extended to max (5 GB, TotalPhysicalMemory/2).

提示

如果复制目标是 .export 命令支持的数据存储之一,并且没有任何复制活动功能对于满足需求而言至关重要,请选择 .export 命令。If your copy destination is one of the data stores supported by the .export command, and if none of the Copy activity features is crucial to your needs, select the .export command.

将数据复制到 Azure 数据资源管理器Copying data to Azure Data Explorer

可以使用复制活动或从查询引入.set-or-append.set-or-replace.set.replace))和从存储引入 (.ingest) 等引入命令,将数据复制到 Azure 数据资源管理器。You can copy data to Azure Data Explorer using the copy activity or ingestion commands such as ingest from query (.set-or-append, .set-or-replace, .set, .replace), and ingest from storage (.ingest).

下表比较了用于将数据复制到 Azure 数据资源管理器的复制活动和引入命令。See the following table for a comparison of the Copy activity, and ingestion commands for copying data to Azure Data Explorer.

复制活动Copy activity 从查询引入Ingest from query
.set-or-append / .set-or-replace / .set / .replace
从存储引入Ingest from storage
.ingest
流程说明Flow description ADF 从源数据存储获取数据,将数据转换为表格格式,并执行所需的架构映射更改。ADF gets the data from the source data store, converts it into a tabular format, and does the required schema-mapping changes. 然后,ADF 将数据上传到 Azure Blob,将数据拆分为区块,然后下载 Blob 以将其引入 ADX 表中。ADF then uploads the data to Azure blobs, splits it into chunks, then downloads the blobs to ingest them into the ADX table.
源数据存储 > ADF > Azure Blob > ADX(Source data store > ADF > Azure blobs > ADX)
这些命令可以执行查询或 .show 命令,并将查询结果引入表中 (ADX > ADX)。These commands can execute a query or a .show command, and ingest the results of the query into a table (ADX > ADX). 此命令通过从一个或多个云存储项目“提取”数据,将数据引入表中。This command ingests data into a table by "pulling" the data from one or more cloud storage artifacts.
支持的源数据存储Supported source data stores 多种选项variety of options ADLS Gen 2、Azure Blob、SQL(使用 sql_request 插件)、Cosmos(使用 cosmosdb_sql_request 插件),以及提供 HTTP 或 Python API 的任何其他数据存储。ADLS Gen 2, Azure Blob, SQL (using the sql_request plugin), Cosmos (using the cosmosdb_sql_request plugin), and any other data store that provides HTTP or Python APIs. Filesystem、Azure Blob 存储、ADLS Gen 1、ADLS Gen 2Filesystem, Azure Blob Storage, ADLS Gen 1, ADLS Gen 2
“性能”Performance 引入内容将会排队并受到管理,确保实现小规模的引入,并通过提供负载均衡、重试和错误处理来确保高可用性。Ingestions are queued and managed, which ensures small-size ingestions and assures high availability by providing load balancing, retries and error handling.
  • 这些命令并不旨在用于导入大量数据。Those commands weren't designed for high volume data importing.
  • 它们可按预期方式运行,且成本低廉。Works as expected and cheaper. 但是,对于生产方案以及在流量速率和数据量较大时,请使用复制活动。But for production scenarios and when traffic rates and data sizes are large, use the Copy activity.
服务器限制Server Limits
  • 无大小限制。No size limit.
  • 最大超时限制:引入每个 Blob 为 1 小时。Max timeout limit: 1 hour per ingested blob.
  • 只有查询部分存在大小限制,可通过指定 noTruncation=true 来跳过该限制。There's only a size limit on the query part, which can be skipped by specifying noTruncation=true.
  • 最大超时限制:1 小时。Max timeout limit: 1 hour.
  • 无大小限制。No size limit.
  • 最大超时限制:1 小时。Max timeout limit: 1 hour.

提示

  • 将数据从 ADF 复制到 Azure 数据资源管理器时,请使用 ingest from query 命令。When copying data from ADF to Azure Data Explorer use the ingest from query commands.
  • 对于大型数据集 (>1GB),请使用复制活动。For large data sets (>1GB), use the Copy activity.

所需的权限Required permissions

下表列出了执行与 Azure 数据工厂集成的各个步骤所需的权限。The following table lists the required permissions for various steps in the integration with Azure Data Factory.

步骤Step 操作Operation 最低权限级别Minimum level of permissions 注释Notes
创建链接服务Create a Linked Service 数据库导航Database navigation 数据库查看者database viewer
使用 ADF 的已登录用户需获得授权才能读取数据库元数据。The logged-in user using ADF should be authorized to read database metadata.
用户可以手动提供数据库名称。User can provide the database name manually.
测试连接Test Connection 数据库监视者或表引入者 database monitor or table ingestor
服务主体需获得授权才能执行数据库级别的 .show 命令或表级别的引入。Service principal should be authorized to execute database level .show commands or table level ingestion.
  • TestConnection 验证与群集的连接,而不验证与数据库的连接。TestConnection verifies the connection to the cluster, and not to the database. 即使数据库不存在,此命令也可能会成功。It can succeed even if the database doesn't exist.
  • 拥有表管理员权限并不足够。Table admin permissions aren't sufficient.
创建数据集Creating a Dataset 表导航Table navigation 数据库监视者database monitor
使用 ADF 的已登录用户必须获得授权才能执行数据库级别的 .show 命令。The logged in user using ADF, must be authorized to execute database level .show commands.
用户可以手动提供表名称。User can provide table name manually.
创建数据集复制活动Creating a Dataset or Copy Activity 预览数据Preview data 数据库查看者database viewer
服务主体必须获得授权才能读取数据库元数据。Service principal must be authorized to read database metadata.
导入架构Import schema 数据库查看者database viewer
服务主体必须获得授权才能读取数据库元数据。Service principal must be authorized to read database metadata.
当 ADX 是表格到表格复制的源时,ADF 会自动导入架构,即使用户未显式导入架构也是如此。When ADX is the source of a tabular-to-tabular copy, ADF will import schema automatically, even if the user didn't import schema explicitly.
ADX 用作接收器ADX as Sink 创建按名称的列映射Create a by-name column mapping 数据库监视者database monitor
服务主体必须获得授权才能执行数据库级别的 .show 命令。Service principal must be authorized to execute database level .show commands.
  • 所有必需的操作将使用“表引入者”角色。All mandatory operations will work with table ingestor.
  • 某些可选操作可能会失败。Some optional operations can fail.
  • 在表中创建 CSV 映射Create a CSV mapping on the table
  • 删除映射Drop the mapping
表引入者或数据库管理员 table ingestor or database admin
服务主体必须获得授权才能对表进行更改。Service principal must be authorized to make changes to a table.
引入数据Ingest data 表引入者或数据库管理员 table ingestor or database admin
服务主体必须获得授权才能对表进行更改。Service principal must be authorized to make changes to a table.
ADX 用作源ADX as source 执行查询Execute query 数据库查看者database viewer
服务主体必须获得授权才能读取数据库元数据。Service principal must be authorized to read database metadata.
Kusto 命令Kusto command 根据每个命令的权限级别。According to the permissions level of each command.

性能Performance

如果 Azure 数据资源管理器是源,而你使用包含 where 查询的查找、复制或命令活动,请参阅查询最佳做法了解性能信息,并参阅复制活动的 ADF 文档If Azure Data Explorer is the source and you use the Lookup, copy, or command activity that contains a query where, refer to query best practices for performance information and ADF documentation for copy activity.

本部分介绍如何使用将 Azure 数据资源管理器用作接收器的复制活动。This section addresses the use of copy activity where Azure Data Explorer is the sink. Azure 数据资源管理器接收器的估计吞吐量为 11-13 MBps。The estimated throughput for Azure Data Explorer sink is 11-13 MBps. 下表详细描述了影响 Azure 数据资源管理器接收器性能的参数。The following table details the parameters influencing the performance of the Azure Data Explorer sink.

参数Parameter 注释Notes
组件的地理邻近性Components geographical proximity 将所有组件放在同一区域:Place all components in the same region:
  • 源和接收器数据存储。source and sink data stores.
  • ADF 集成运行时。ADF integration runtime.
  • ADX 群集。Your ADX cluster.
确保最起码你自己的集成运行时位于 ADX 群集所在的同一区域。Make sure that at least your integration runtime is in the same region as your ADX cluster.
DIU 数Number of DIUs ADF 使用的每 4 个 DIU 有 1 个 VM。1 VM for every 4 DIUs used by ADF.
仅当源是包含多个文件的基于文件的存储时,增加 DIU 才有作用。Increasing the DIUs will help only if your source is a file-based store with multiple files. 然后,每个 VM 将并行处理一个不同的文件。Each VM will then process a different file in parallel. 因此,复制单个大文件比复制多个小文件的延迟更高。Therefore, copying a single large file will have a higher latency than copying multiple smaller files.
ADX 群集的数量和 SKUAmount and SKU of your ADX cluster 使用大量的 ADX 节点会大幅增加引入处理时间。High number of ADX nodes will boost ingestion processing time.
并行度Parallelism 若要从数据库中复制大量数据,请将数据分区,然后使用 ForEach 循环并行复制每个分区,或使用“从数据库批量复制到 Azure 数据资源管理器”模板To copy a very large amount of data from a database, partition your data and then use a ForEach loop that copies each partition in parallel or use the Bulk Copy from Database to Azure Data Explorer Template. 注意:复制活动中的“设置” > “并行度”与 ADX 无关。 Note: Settings > Degree of Parallelism in the Copy activity isn't relevant to ADX.
数据处理复杂性Data processing complexity 延迟根据源文件格式、列映射和压缩而有所不同。Latency varies according to source file format, column mapping, and compression.
运行集成运行时的 VMThe VM running your integration runtime
  • 对于 Azure 复制,无法更改 ADF VM 和计算机 SKU。For Azure copy, ADF VMs and machine SKUs can't be changed.
  • 对于本地到 Azure 的复制,请确定托管自承载 IR 的 VM 是否足够强大。For on-prem to Azure copy, determine that the VM hosting your self-hosted IR is strong enough.

提示和常见错误Tips and common pitfalls

监视活动进度Monitor activity progress

  • 监视活动进度时,“写入的数据”属性可能比“读取的数据”属性要大得多,因为“读取的数据”是根据二进制文件大小计算的,而“写入的数据”是在反序列化并解压缩数据后,根据内存中大小计算的。 When monitoring the activity progress, the Data written property may be much larger than the Data read property because Data read is calculated according to the binary file size, while Data written is calculated according to the in-memory size, after data is de-serialized and decompressed.

  • 监视活动进度时,可以看到数据已写入 Azure 数据资源管理器接收器。When monitoring the activity progress, you can see that data is written to the Azure Data Explorer sink. 查询 Azure 数据资源管理器表时,会看到数据尚未抵达。When querying the Azure Data Explorer table, you see that data hasn't arrived. 这是因为,复制到 Azure 数据资源管理器的过程是分两个阶段完成的。This is because there are two stages when copying to Azure Data Explorer.

    • 第一阶段读取源数据,将数据拆分为 900-MB 的区块,然后将每个区块上传到 Azure Blob。First stage reads the source data, splits it to 900-MB chunks, and uploads each chunk to an Azure Blob. ADF 活动进度视图会显示第一阶段。The first stage is seen by the ADF activity progress view.
    • 将所有数据上传到 Azure Blob 后,第二个阶段随即开始。The second stage begins once all the data is uploaded to Azure Blobs. Azure 数据资源管理器引擎节点下载 Blob,并将数据引入接收器表。The Azure Data Explorer engine nodes download the blobs and ingest the data into the sink table. 然后,Azure 数据资源管理器表中会显示这些数据。The data is then seen in your Azure Data Explorer table.

因转义不当而无法引入 CSV 文件Failure to ingest CSV files due to improper escaping

Azure 数据资源管理器要求 CSV 文件遵循 RFC 4180Azure Data Explorer expects CSV files to align with RFC 4180. 它要求:It expects:

  • 包含需要转义的字符(例如 " 和新行)的字段应以 " 字符开头和结尾,不使用空格。Fields that contain characters that require escaping (such as " and new lines) should start and end with a " character, without whitespace. 字段中的所有 " 字符都使用双 " 字符 ( "" ) 进行转义。All " characters inside the field are escaped by using a double " character (""). 例如, "Hello, ""World""" 是一个有效的 CSV 文件,所包含的单个记录有一个内容为 Hello, "World" 的列或字段。For example, "Hello, ""World""" is a valid CSV file with a single record having a single column or field with the content Hello, "World".
  • 文件中的所有记录必须有相同的列数和字段数。All records in the file must have the same number of columns and fields.

Azure 数据工厂允许反斜杠(转义)字符。Azure Data Factory allows the backslash (escape) character. 如果使用 Azure 数据工厂生成一个包含反斜杠字符的 CSV 文件,则无法将该文件引入 Azure 数据资源管理器。If you generate a CSV file with a backslash character using Azure Data Factory, ingestion of the file to Azure Data Explorer will fail.

示例Example

以下文本值:Hello, "World"The following text values: Hello, "World"
ABC DEFABC DEF
"ABC\D"EF"ABC\D"EF
"ABC DEF"ABC DEF

应该会在相应的 CSV 文件中显示如下:"Hello, ""World"""Should appear in a proper CSV file as follows: "Hello, ""World"""
"ABC DEF""ABC DEF"
"""ABC DEF""""ABC DEF"
"""ABC\D""EF""""ABC\D""EF"

使用默认的转义字符(反斜杠)时,以下 CSV 不能在 Azure 数据资源管理器中使用:"Hello, "World""By using the default escape character (backslash), the following CSV won't work with Azure Data Explorer: "Hello, "World""
"ABC DEF""ABC DEF"
""ABC DEF"""ABC DEF"
""ABC\D"EF"""ABC\D"EF"

嵌套的 JSON 对象Nested JSON objects

将 JSON 文件复制到 Azure 数据资源管理器时,请注意:When copying a JSON file to Azure Data Explorer, note that:

  • 不支持数组。Arrays aren't supported.
  • 如果 JSON 结构包含对象数据类型,Azure 数据工厂会将对象的子项平展,并尝试将每个子项映射到 Azure 数据资源管理器表中的不同列。If your JSON structure contains object data types, Azure Data Factory will flatten the object's child items, and try to map each child item to a different column in your Azure Data Explorer table. 若要将整个对象项映射到 Azure 数据资源管理器中的单个列,请执行以下操作:If you want the entire object item to be mapped to a single column in Azure Data Explorer:
    • 将整个 JSON 行引入 Azure 数据资源管理器中的单个动态列。Ingest the entire JSON row into a single dynamic column in Azure Data Explorer.
    • 使用 Azure 数据工厂的 JSON 编辑器手动编辑管道定义。Manually edit the pipeline definition by using Azure Data Factory's JSON editor. 在“映射”中,执行以下操作:In Mappings
      • 删除为每个子项创建的多个映射,并添加单个可将对象类型映射到表列的映射。Remove the multiple mappings that were created for each child item, and add a single mapping that maps your object type to your table column.
      • 在右方括号后面添加一个逗号,后跟:After the closing square bracket, add a comma followed by:
        "mapComplexValuesToString": true"mapComplexValuesToString": true.

在复制到 Azure 数据资源管理器时指定 AdditionalPropertiesSpecify AdditionalProperties when copying to Azure Data Explorer

备注

目前,手动编辑 JSON 有效负载即可使用此功能。This feature is currently available by manually editing the JSON payload.

在复制活动的“sink”节下添加单个行,如下所示:Add a single row under the "sink" section of the copy activity as follows:

"sink": {
    "type": "AzureDataExplorerSink",
    "additionalProperties": "{\"tags\":\"[\\\"drop-by:account_FiscalYearID_2020\\\"]\"}"
},

对值的转义操作可能很微妙。Escaping of the value may be tricky. 请使用以下代码片段作为参考:Use the following code snippet as a reference:

static void Main(string[] args)
{
       Dictionary<string, string> additionalProperties = new Dictionary<string, string>();
       additionalProperties.Add("ignoreFirstRecord", "false");
       additionalProperties.Add("csvMappingReference", "Table1_mapping_1");
       IEnumerable<string> ingestIfNotExists = new List<string> { "Part0001" };
       additionalProperties.Add("ingestIfNotExists", JsonConvert.SerializeObject(ingestIfNotExists));
       IEnumerable<string> tags = new List<string> { "ingest-by:Part0001", "ingest-by:IngestedByTest" };
       additionalProperties.Add("tags", JsonConvert.SerializeObject(tags));
       var additionalPropertiesForPayload = JsonConvert.SerializeObject(additionalProperties);
       Console.WriteLine(additionalPropertiesForPayload);
       Console.ReadLine();
}

输出的值:The printed value:

{"ignoreFirstRecord":"false","csvMappingReference":"Table1_mapping_1","ingestIfNotExists":"[\"Part0001\"]","tags":"[\"ingest-by:Part0001\",\"ingest-by:IngestedByTest\"]"}

后续步骤Next steps