使用 Azure 数据工厂命令活动运行 Azure 数据资源管理器控制命令Use Azure Data Factory command activity to run Azure Data Explorer control commands

Azure 数据工厂 (ADF) 是基于云的数据集成服务,可用于对数据执行一系列活动。Azure Data Factory (ADF) is a cloud-based data integration service that allows you to perform a combination of activities on the data. 使用 ADF 可以创建数据驱动式工作流用于协调和自动化数据移动与数据转换。Use ADF to create data-driven workflows for orchestrating and automating data movement and data transformation. 使用 Azure 数据工厂中的“Azure 数据资源管理器命令”活动,可以在 ADF 工作流中运行 Azure 数据资源管理器控制命令The Azure Data Explorer Command activity in Azure Data Factory enables you to run Azure Data Explorer control commands within an ADF workflow. 本文介绍如何使用 Lookup 活动和 ForEach 活动创建包含 Azure 数据资源管理器命令活动的管道。This article teaches you how to create a pipeline with a lookup activity and ForEach activity containing an Azure Data Explorer command activity.


创建新管道Create a new pipeline

  1. 选择“创作”铅笔图标。Select the Author pencil tool.

  2. 选择 + ,然后从下拉菜单中选择“管道”,以创建新的管道。Create a new pipeline by selecting + and then select Pipeline from the drop-down.


创建 Lookup 活动Create a Lookup activity

Lookup 活动可以从 Azure 数据工厂支持的任何数据源检索数据集。A lookup activity can retrieve a dataset from any Azure Data Factory-supported data sources. Lookup 活动的输出可以在 ForEach 或其他活动中使用。The output from Lookup activity can be used in a ForEach or other activity.

  1. 在“活动”窗格中的“常规”下,选择“Lookup”活动。 In the Activities pane, under General, select the Lookup activity. 将其拖放到右侧的主画布中。Drag and drop it into the main canvas on the right.

    选择 Lookup 活动

  2. 画布现在包含创建的 Lookup 活动。The canvas now contains the Lookup activity you created. 使用画布下面的选项卡更改任何相关参数。Use the tabs below the canvas to change any relevant parameters. 在“常规”中,将活动重命名。In General, rename the activity.

    编辑 Lookup 活动


    单击画布中的空白区域可查看管道属性。Click on the empty canvas area to view the pipeline properties. 使用“常规”选项卡可将管道重命名。Use the General tab to rename the pipeline. 示例中的管道命名为 pipeline-4-docsOur pipeline is named pipeline-4-docs.

在 Lookup 活动中创建 Azure 数据资源管理器数据集Create an Azure Data Explorer dataset in lookup activity

  1. 在“设置”中,选择预先创建的 Azure 数据资源管理器“源数据集”,或选择“+ 新建”以创建新的数据集。 In Settings, select your pre-created Azure Data Explorer Source dataset, or select + New to create a new dataset.

    在 Lookup 设置中添加数据集

  2. 在“新建数据集”窗口中选择“Azure 数据资源管理器(Kusto)”数据集。 Select the Azure Data Explorer (Kusto) dataset from New Dataset window. 选择“继续”以添加新数据集。Select Continue to add the new dataset.


  3. 新 Azure 数据资源管理器数据集参数将显示在“设置”中。The new Azure Data Explorer dataset parameters are visible in Settings. 若要更新参数,请选择“编辑”。To update the parameters, select Edit.

    Azure 数据资源管理器数据集的 Lookup 设置

  4. 主画布中将打开新选项卡“AzureDataExplorerTable”。The AzureDataExplorerTable new tab opens in the main canvas.

    • 选择“常规”并编辑数据集名称。Select General and edit the dataset name.
    • 选择“连接”以编辑数据集属性。Select Connection to edit the dataset properties.
    • 从下拉列表中选择“链接服务”,或选择“+ 新建”以创建新的链接服务。 Select the Linked service from the drop-down, or select + New to create a new linked service.

    编辑 Azure 数据资源管理器数据集属性

  5. 创建新的链接服务时,将打开“新建链接服务(Azure 数据资源管理器)”页:When creating a new linked service, the New Linked Service (Azure Data Explorer) page opens:

    ADX - 新建链接服务

    • 选择 Azure 数据资源管理器链接服务的 名称Select Name for Azure Data Explorer linked service. 根据需要添加 说明Add Description if needed.
    • 在“通过集成运行时进行连接”中,根据需要更改当前设置。In Connect via integration runtime, change current settings, if needed.
    • 在“帐户选择方法”中,使用以下两种方法之一选择群集:In Account selection method select your cluster using one of two methods:
      • 选中“从 Azure 订阅”单选按钮,并选择你的 Azure 订阅 帐户。Select the From Azure subscription radio button and select your Azure subscription account. 然后选择你的 群集Then, select your Cluster. 请注意,下拉列表中只会列出属于该用户的群集。Note the drop-down will only list clusters that belong to the user.
      • 应选择“手动输入”单选按钮并输入你的 终结点(群集 URL)。Instead, select Enter manually radio button and enter your Endpoint (cluster URL).
    • 指定 租户Specify the Tenant.
    • 输入 服务主体 IDEnter Service principal ID. 根据所用命令所需的权限级别,主体 ID 必须拥有足够的权限。The principal ID must have the adequate permissions, according to the permission level required by the command being used.
    • 选择“服务主体密钥”按钮并输入 服务主体密钥Select Service principal key button and enter Service Principal Key.
    • 从下拉菜单中选择你的 数据库Select your Database from the dropdown menu. 或者,选中“编辑”复选框并输入你的数据库名称。Alternatively, select Edit checkbox and enter your database name.
    • 选择“测试连接”以测试创建的链接服务连接。Select Test Connection to test the linked service connection you created. 如果可以连接到设置,会出现绿色的勾选标记“连接成功”。If you can connect to your setup, a green checkmark Connection successful will appear.
    • 选择“完成”以完成链接服务的创建过程。Select Finish to complete linked service creation.
  6. 设置链接服务后,在“AzureDataExplorerTable” > “连接”中添加表名称。 Once you've set up a linked service, In AzureDataExplorerTable > Connection, add Table name. 选择“预览数据”,确保数据正确呈现。Select Preview data, to make sure that the data is presented properly.

    数据集现已准备就绪,接下来可以继续编辑管道。Your dataset is now ready, and you can continue editing your pipeline.

将查询添加到 Lookup 活动Add a query to your lookup activity

  1. 在“pipeline-4-docs” > “设置”中的“查询”文本框内添加一个查询,例如: In pipeline-4-docs > Settings add a query in Query text box, for example:

    | where Database !in ("KustoMonitoringPersistentDatabase", "$systemdb")
    | summarize count() by Database
  2. 根据需要更改“查询超时”或“不截断”和“仅第一行”属性。 Change the Query timeout or No truncation and First row only properties, as needed. 在此流程中,我们保留了默认的“查询超时”,并取消选中了相应的复选框。In this flow, we keep the default Query timeout and uncheck the checkboxes.

    Lookup 活动的最终设置

创建 For-Each 活动Create a For-Each activity

For-Each 活动用于循环访问集合,并在循环中执行指定的活动。The For-Each activity is used to iterate over a collection and execute specified activities in a loop.

  1. 现在,将 For-Each 活动添加到管道。Now you add a For-Each activity to the pipeline. 此活动将处理 Lookup 活动返回的数据。This activity will process the data returned from the Lookup activity.

    • 在“活动”窗格中的“迭代和条件”下,选择“ForEach”活动并将其拖放到画布中。 In the Activities pane, under Iteration & Conditionals, select the ForEach activity and drag and drop it into the canvas.

    • 在画布中 Lookup 活动的输出与 ForEach 活动的输入之间绘制一条线,以将其连接起来。Draw a line between the output of the Lookup activity and the input of the ForEach activity in the canvas to connect them.

      ForEach 活动

  2. 在画布中选择 ForEach 活动。Select the ForEach activity in the canvas. 在下面的“设置”选项卡中:In the Settings tab below:

    • 选中“顺序”复选框以按顺序处理 Lookup 结果,或将其保留为未选中状态以创建并行处理。Check the Sequential checkbox for a sequential processing of the Lookup results, or leave it unchecked to create parallel processing.

    • 设置“批计数”。Set Batch count.

    • 在“项”中,提供对输出值的以下引用: @activity('Lookup1').output.valueIn Items, provide the following reference to the output value: @activity('Lookup1').output.value

      ForEach 活动设置

在 ForEach 活动中创建 Azure 数据资源管理器命令活动Create an Azure Data Explorer Command activity within the ForEach activity

  1. 在画布中双击 ForEach 活动,在新画布中将其打开,以指定 ForEach 中的活动。Double-click the ForEach activity in the canvas to open it in a new canvas to specify the activities within ForEach.

  2. 在“活动”窗格中的“Azure 数据资源管理器”下,选择“Azure 数据资源管理器命令”活动,并将其拖放到画布中。 In the Activities pane, under Azure Data Explorer, select the Azure Data Explorer Command activity and drag and drop it into the canvas.

    Azure 数据资源管理器命令活动

  3. 在“连接”选项卡中,选择前面创建的同一链接服务。In the Connection tab, select the same Linked Service previously created.

    Azure 数据资源管理器命令活动连接选项卡

  4. 在“命令”选项卡中提供以下命令:In the Command tab, provide the following command:

    async compressed
    into csv h"http://<storageName>.blob.core.chinacloudapi.cn/data/ClusterQueries;<storageKey>" with (
    <| ClusterQueries | where Database == "@{item().Database}"

    命令 指示 Azure 数据资源管理器将给定查询的结果以压缩格式导出到 Blob 存储中。The Command instructs Azure Data Explorer to export the results of a given query into a blob storage, in a compressed format. 该命令以异步方式运行(使用 async 修饰符)。It runs asynchronously (using the async modifier). 查询将寻址 Lookup 活动结果中每一行的数据库列。The query addresses the database column of each row in the Lookup activity result. “命令超时”可保持不变。The Command timeout can be left unchanged.



    命令活动具有以下限制:The command activity has the following limits:

    • 大小限制:1 MB 响应大小Size limit: 1 MB response size
    • 时间限制:20 分钟(默认),1 小时(最大)。Time limit: 20 minutes (default), 1 hour (maximum).
    • 如果需要,可以使用 AdminThenQuery 将查询追加到结果,以减少最终的大小/时间。If needed, you can append a query to the result using AdminThenQuery, to reduce resulting size/time.
  5. 管道现已准备就绪。Now the pipeline is ready. 可以单击管道名称返回到主管道视图。You can go back to the main pipeline view by clicking the pipeline name.

    Azure 数据资源管理器命令管道

  6. 在发布管道之前选择“调试”。Select Debug before publishing the pipeline. 可以在“输出”选项卡中监视管道进度。The pipeline progress can be monitored in the Output tab.

    Azure 数据资源管理器命令活动输出

  7. 可以依次选择“全部发布”和“添加触发器”来运行管道。 You can Publish All and then Add trigger to run the pipeline.

控制命令输出Control command outputs

下面详细说明了命令活动输出的结构。The structure of the command activity output is detailed below. 此输出可由管道中的下一个活动使用。This output can be used by the next activity in the pipeline.

非异步控制命令的返回值Returned value of a non-async control command

在非异步控制命令中,返回值的结构类似于 Lookup 活动结果的结构。In a non-async control command, the structure of the returned value is similar to the structure of the Lookup activity result. count 字段指示返回的记录数。The count field indicates the number of returned records. 固定数组字段 value 包含记录列表。A fixed array field value contains a list of records.

    "count": "2", 
    "value": [ 
            "ExtentId": "1b9977fe-e6cf-4cda-84f3-4a7c61f28ecd", 
            "ExtentSize": 1214.0, 
            "CompressedSize": 520.0 
            "ExtentId": "b897f5a3-62b0-441d-95ca-bf7a88952974", 
            "ExtentSize": 1114.0, 
            "CompressedSize": 504.0 

异步控制命令的返回值Returned value of an async control command

在异步控制命令中,活动在幕后将不断轮询操作表,直到异步操作完成或超时。因此,返回的值将包含给定 OperationId 属性的 .show operations OperationId 结果。In an async control command, the activity polls the operations table behind the scenes, until the async operation is completed or times-out. Therefore, the returned value will contain the result of .show operations OperationId for that given OperationId property. 请检查“状况”和“状态”属性的值,以确认操作是否成功完成。 Check the values of State and Status properties, to verify successful completion of the operation.

    "count": "1", 
    "value": [ 
            "OperationId": "910deeae-dd79-44a4-a3a2-087a90d4bb42", 
            "Operation": "TableSetOrAppend", 
            "NodeId": "", 
            "StartedOn": "2019-06-23T10:12:44.0371419Z", 
            "LastUpdatedOn": "2019-06-23T10:12:46.7871468Z", 
            "Duration": "00:00:02.7500049", 
            "State": "Completed", 
            "Status": "", 
            "RootActivityId": "f7c5aaaf-197b-4593-8ba0-e864c94c3c6f", 
            "ShouldRetry": false, 
            "Database": "MyDatabase", 
            "Principal": "<some principal id>", 
            "User": "<some User id>" 

后续步骤Next steps