在 Azure 数据工厂管道中使用自定义活动Use custom activities in an Azure Data Factory pipeline

在 Azure 数据工厂管道中可使用两类活动。There are two types of activities that you can use in an Azure Data Factory pipeline.

若要将数据移入/移出数据工厂不支持的数据存储,或者要以数据工厂不支持的方式转换/处理数据,可以使用你自己的数据移动或转换逻辑创建自定义活动,并在管道中使用该活动。To move data to/from a data store that Data Factory does not support, or to transform/process data in a way that isn't supported by Data Factory, you can create a Custom activity with your own data movement or transformation logic and use the activity in a pipeline. 自定义活动在虚拟机的 Azure Batch 池上运行自定义代码逻辑。The custom activity runs your customized code logic on an Azure Batch pool of virtual machines.

备注

本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShellFor Az module installation instructions, see Install Azure PowerShell.

如果不熟悉 Azure Batch 服务,请参阅以下文章:See following articles if you are new to Azure Batch service:

Azure Batch 链接服务Azure Batch linked service

下面的 JSON 定义了一个示例 Azure Batch 链接服务。The following JSON defines a sample Azure Batch linked service. 有关详细信息,请参阅 Azure 数据工厂支持的计算环境For details, see Compute environments supported by Azure Data Factory

{
    "name": "AzureBatchLinkedService",
    "properties": {
        "type": "AzureBatch",
        "typeProperties": {
            "accountName": "batchaccount",
            "accessKey": {
                "type": "SecureString",
                "value": "access key"
            },
            "batchUri": "https://batchaccount.region.batch.azure.cn",
            "poolName": "poolname",
            "linkedServiceName": {
                "referenceName": "StorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
}

若要了解有关 Azure Batch 链接服务的详细信息,请参阅计算链接服务一文。To learn more about Azure Batch linked service, see Compute linked services article.

自定义活动Custom activity

下面的 JSON 代码片段定义了具有简单自定义活动的管道。The following JSON snippet defines a pipeline with a simple Custom Activity. 活动定义引用了 Azure Batch 链接服务。The activity definition has a reference to the Azure Batch linked service.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "helloworld.exe",
        "folderPath": "customactv2/helloworld",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }]
  }
}

在此示例中,helloworld.exe 是一个自定义应用程序,存储在 resourceLinkedService 中使用的 Azure 存储帐户的 customactv2/helloworld 文件夹。In this sample, the helloworld.exe is a custom application stored in the customactv2/helloworld folder of the Azure Storage account used in the resourceLinkedService. 自定义活动提交此自定义应用程序以便在 Azure Batch 上执行。The Custom activity submits this custom application to be executed on Azure Batch. 可以将命令替换为可以在 Azure Batch 池节点的目标操作系统上执行的任何所需应用程序的命令。You can replace the command to any preferred application that can be executed on the target Operation System of the Azure Batch Pool nodes.

下表描述了此活动特有的属性的名称和描述。The following table describes names and descriptions of properties that are specific to this activity.

属性Property 说明Description 必须Required
namename 管道中活动的名称Name of the activity in the pipeline Yes
说明description 描述活动用途的文本。Text describing what the activity does. No
typetype 对于自定义活动,活动类型为 CustomFor Custom activity, the activity type is Custom. Yes
linkedServiceNamelinkedServiceName Azure Batch 的链接服务。Linked Service to Azure Batch. 若要了解此链接服务,请参阅计算链接服务一文。To learn about this linked service, see Compute linked services article. Yes
commandcommand 要执行的自定义应用程序的命令。Command of the custom application to be executed. 如果应用程序在 Azure Batch 池节点上已可用,可以跳过 resourceLinkedService 和 folderPath。If the application is already available on the Azure Batch Pool Node, the resourceLinkedService and folderPath can be skipped. 例如,可以将命令指定为 cmd /c dir,Windows Batch 池节点针对该命令提供了本机支持。For example, you can specify the command to be cmd /c dir, which is natively supported by the Windows Batch Pool node. Yes
resourceLinkedServiceresourceLinkedService 存储着自定义应用程序的存储帐户的 Azure 存储链接服务Azure Storage Linked Service to the Storage account where the custom application is stored 否 *No *
folderPathfolderPath 自定义应用程序及其所有依赖项所在的文件夹的路径Path to the folder of the custom application and all its dependencies

如果将依赖项存储在子文件夹中(即 folderPath 下的分层文件夹结构中),目前当文件复制到 Azure Batch 时,文件夹结构将被平展。If you have dependencies stored in subfolders - that is, in a hierarchical folder structure under folderPath - the folder structure is currently flattened when the files are copied to Azure Batch. 也就是说,所有文件将复制到没有子文件夹的单个文件夹中。That is, all files are copied into a single folder with no subfolders. 若要解决此行为,请考虑压缩文件,复制压缩文件,然后在所需位置使用自定义代码解压缩文件。To work around this behavior, consider compressing the files, copying the compressed file, and then unzipping it with custom code in the desired location.
否 *No *
referenceObjectsreferenceObjects 现有链接服务和数据集的数组。An array of existing Linked Services and Datasets. 所引用的链接服务和数据集采用 JSON 格式传递到自定义应用程序,因此,自定义代码可以引用数据工厂的资源The referenced Linked Services and Datasets are passed to the custom application in JSON format so your custom code can reference resources of the Data Factory No
extendedPropertiesextendedProperties 可以采用 JSON 格式传递到自定义应用程序的用户定义属性,以便自定义代码可以引用更多属性User-defined properties that can be passed to the custom application in JSON format so your custom code can reference additional properties No
retentionTimeInDaysretentionTimeInDays 为自定义活动提交的文件的保留时间。The retention time for the files submitted for custom activity. 默认值为 30 天。Default value is 30 days. No

* 属性 resourceLinkedServicefolderPath 必须同时指定或同时省略。* The properties resourceLinkedService and folderPath must either both be specified or both be omitted.

备注

如果要在自定义活动中将链接服务作为 referenceObjects 传递,则传递已启用 Azure Key Vault 的链接服务(因为它不包含任何安全字符串)并使用机密名称直接通过代码从 Key Vault 中提取凭据是一种很好的安全做法。If you are passing linked services as referenceObjects in Custom Activity, it is a good security practice to pass an Azure Key Vault enabled linked service (since it does not contain any secure strings) and fetch the credentials using secret name directly from Key Vault from the code. 可以在此处找到一个示例,该示例引用启用了 AKV 的链接服务,从密钥保管库检索凭据,然后通过代码访问存储。You can find an example here that references AKV enabled linked service, retrieves the credentials from Key Vault, and then accesses the storage in the code.

自定义活动权限Custom activity permissions

自定义活动将 Azure Batch 自动用户帐户设置为“任务范围内非管理员访问权限” (默认自动用户规范)。The custom activity sets the Azure Batch auto-user account to Non-admin access with task scope (the default auto-user specification). 无论更改自动用户帐户的权限级别。You can't change the permission level of the auto-user account. 有关详细信息,请参阅在 Batch 中的用户帐户下运行任务 | 自动用户帐户For more info, see Run tasks under user accounts in Batch | Auto-user accounts.

执行命令Executing commands

可以直接使用自定义活动执行命令。You can directly execute a command using Custom Activity. 以下示例在目标 Azure Batch 池节点上运行“echo hello world”命令,并将输出传输到 stdout。The following example runs the "echo hello world" command on the target Azure Batch Pool nodes and prints the output to stdout.

{
  "name": "MyCustomActivity",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "cmd /c echo hello world"
      }
    }]
  }
}

传递对象和属性Passing objects and properties

此示例展示了如何使用 referenceObjects 和 extendedProperties 将数据工厂对象和用户定义的属性传递到自定义应用程序。This sample shows how you can use the referenceObjects and extendedProperties to pass Data Factory objects and user-defined properties to your custom application.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "SampleApp.exe",
        "folderPath": "customactv2/SampleApp",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        },
        "referenceObjects": {
          "linkedServices": [{
            "referenceName": "AzureBatchLinkedService",
            "type": "LinkedServiceReference"
          }]
        },
        "extendedProperties": {
          "connectionString": {
            "type": "SecureString",
            "value": "aSampleSecureString"
          },
          "PropertyBagPropertyName1": "PropertyBagValue1",
          "propertyBagPropertyName2": "PropertyBagValue2",
          "dateTime1": "2015-04-12T12:13:14Z"
        }
      }
    }]
  }
}

执行活动时,referenceObjects 和 extendedProperties 存储在以下文件中,这些文件部署到 SampleApp.exe 所在的同一执行文件夹中:When the activity is executed, referenceObjects and extendedProperties are stored in following files that are deployed to the same execution folder of the SampleApp.exe:

  • activity.json

    存储自定义活动的 extendedProperties 和属性。Stores extendedProperties and properties of the custom activity.

  • linkedServices.json

    存储 referenceObjects 属性中定义的链接服务的数组。Stores an array of Linked Services defined in the referenceObjects property.

  • datasets.json

    存储 referenceObjects 属性中定义的数据集的数组。Stores an array of Datasets defined in the referenceObjects property.

下面的示例代码演示了 SampleApp.exe 如何从 JSON 文件中访问所需的信息:Following sample code demonstrate how the SampleApp.exe can access the required information from JSON files:

using Newtonsoft.Json;
using System;
using System.IO;

namespace SampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            //From Extend Properties
            dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
            Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);

            // From LinkedServices
            dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
            Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
        }
    }
}

检索执行输出Retrieve execution outputs

可以使用以下 PowerShell 命令启动管道运行:You can start a pipeline run using the following PowerShell command:

$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName

管道运行时,可以使用以下命令查看执行输出:When the pipeline is running, you can check the execution output using the following commands:

while ($True) {
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if(!$result) {
        Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
    }
    elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
    }
    else {
        Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
    ($result | Format-List | Out-String)
    Start-Sleep -Seconds 15
}

Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"

Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"

自定义应用程序的 stdoutstderr 保存到在使用任务的 GUID 创建 Azure Batch 链接服务时定义的 Azure 存储链接服务中的 adfjobs 容器。The stdout and stderr of your custom application are saved to the adfjobs container in the Azure Storage Linked Service you defined when creating Azure Batch Linked Service with a GUID of the task. 可以从活动运行输出中获取详细路径,如以下代码片段中所示:You can get the detailed path from Activity Run output as shown in the following snippet:

Pipeline ' MyCustomActivity' run finished. Result:

ResourceGroupName : resourcegroupname
DataFactoryName   : datafactoryname
ActivityName      : MyCustomActivity
PipelineRunId     : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName      : MyCustomActivity
Input             : {command}
Output            : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart  : 10/5/2017 3:33:06 PM
ActivityRunEnd    : 10/5/2017 3:33:28 PM
DurationInMs      : 21203
Status            : Succeeded
Error             : {errorCode, message, failureType, target}

Activity Output section:
"exitcode": 0
"outputs": [
  "https://<container>.blob.core.chinacloudapi.cn/adfjobs/<GUID>/output/stdout.txt",
  "https://<container>.blob.core.chinacloudapi.cn/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (China East 2)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"

如果要在下游活动中使用 stdout.txt 的内容,则可以在表达式“@activity('MyCustomActivity').output.outputs[0]”中获取 stdout.txt 文件的路径。If you would like to consume the content of stdout.txt in downstream activities, you can get the path to the stdout.txt file in expression "@activity('MyCustomActivity').output.outputs[0]".

重要

  • activity.json、linkedServices.json 和 datasets.json 存储在 Batch 任务的 runtime 文件夹中。The activity.json, linkedServices.json, and datasets.json are stored in the runtime folder of the Batch task. 在此示例中,activity.json、linkedServices.json 和 datasets.json 存储在 "https://adfv2storage.blob.core.chinacloudapi.cn/adfjobs/\<GUID>/runtime/" 路径中。For this example, the activity.json, linkedServices.json, and datasets.json are stored in "https://adfv2storage.blob.core.chinacloudapi.cn/adfjobs/\<GUID>/runtime/" path. 必要时需要单独清理它们。If needed, you need to clean them up separately.
  • 对于使用自承载集成运行时的链接服务,将通过自承载集成运行时对敏感信息(例如密钥或密码)进行加密,以确保凭据保留在客户定义的专用网络环境中。For Linked Services that use the Self-Hosted Integration Runtime, the sensitive information like keys or passwords are encrypted by the Self-Hosted Integration Runtime to ensure credential stays in customer defined private network environment. 以此方式在自定义应用程序代码中进行引用时,可能会丢掉一些敏感字段。Some sensitive fields could be missing when referenced by your custom application code in this way. 如果需要,请在 extendedProperties 中使用 SecureString 而非使用链接服务引用。Use SecureString in extendedProperties instead of using Linked Service reference if needed.

将输出传递给另一个活动Pass outputs to another activity

可以通过自定义活动中的代码将自定义值发送回 Azure 数据工厂。You can send custom values from your code in a Custom Activity back to Azure Data Factory. 可以通过从应用程序将自定义值写入 outputs.json 来完成此操作。You can do so by writing them into outputs.json from your application. 数据工厂复制 outputs.json 的内容,并将其作为 customOutput 属性的值追加到活动输出中。Data Factory copies the content of outputs.json and appends it into the Activity Output as the value of the customOutput property. (大小限制为 2 MB。)若要在下游活动中使用 outputs.json 的内容,可以使用表达式 @activity('<MyCustomActivity>').output.customOutput 获取值。(The size limit is 2MB.) If you want to consume the content of outputs.json in downstream activities, you can get the value by using the expression @activity('<MyCustomActivity>').output.customOutput.

检索 SecureString 输出Retrieve SecureString outputs

指定为 SecureString 类型的敏感属性值(如本文中的某些示例所示)在数据工厂用户界面的“监视”选项卡中被屏蔽。Sensitive property values designated as type SecureString, as shown in some of the examples in this article, are masked out in the Monitoring tab in the Data Factory user interface. 但是,在实际的管道执行中,SecureString 属性在 activity.json 文件中以纯文本形式序列化为 JSON。In actual pipeline execution, however, a SecureString property is serialized as JSON within the activity.json file as plain text. 例如:For example:

"extendedProperties": {
  "connectionString": {
    "type": "SecureString",
    "value": "aSampleSecureString"
  }
}

此序列化并不是真正安全的,也不应是安全的。This serialization is not truly secure, and is not intended to be secure. 其目的是提示数据工厂屏蔽“监视”选项卡中的值。The intent is to hint to Data Factory to mask the value in the Monitoring tab.

若要从自定义活动访问 SecureString 类型的属性,请读取 activity.json 文件(该文件与 .EXE 放在同一个文件夹中),反序列化 JSON,然后访问 JSON 属性(extendedProperties => [propertyName] => 值)。To access properties of type SecureString from a custom activity, read the activity.json file, which is placed in the same folder as your .EXE, deserialize the JSON, and then access the JSON property (extendedProperties => [propertyName] => value).

比较 v2 自定义活动和版本 1(自定义)DotNet 活动Compare v2 Custom Activity and version 1 (Custom) DotNet Activity

在 Azure 数据工厂 V2 自定义活动中,不需要实现 .NET 接口。In the Azure Data Factory V2 Custom Activity, you are not required to implement a .NET interface. 现在可以直接运行命令、脚本和自己的已编译为可执行文件的自定义代码。You can now directly run commands, scripts, and your own custom code, compiled as an executable. 要配置该实现,请指定 Command 属性和 folderPath 属性。To configure this implementation, you specify the Command property together with the folderPath property. 自定义活动会将可执行文件及其依赖项上传到 folderpath,并执行命令。The Custom Activity uploads the executable and its dependencies to folderpath and executes the command for you.

可执行文件可以将链接服务、数据集(在 referenceObjects 中定义)和数据工厂 v2 自定义活动的 JSON 有效负载中定义的扩展属性作为 JSON 文件进行访问。The Linked Services, Datasets (defined in referenceObjects), and Extended Properties defined in the JSON payload of a Data Factory v2 Custom Activity can be accessed by your executable as JSON files. 可以使用 JSON 序列化程序访问所需的属性,如前面的 SampleApp.exe 代码示例所示。You can access the required properties using a JSON serializer as shown in the preceding SampleApp.exe code sample.

借助数据工厂 V2 自定义活动中引入的更改,可以使用自己喜欢的语言编写自定义代码逻辑,然后在 Azure Batch 支持的 Windows 和 Linux 操作系统上执行该代码逻辑。With the changes introduced in the Data Factory V2 Custom Activity, you can write your custom code logic in your preferred language and execute it on Windows and Linux Operation Systems supported by Azure Batch.

Azure Batch 的自动缩放Auto-scaling of Azure Batch

还可以使用自动缩放功能创建 Azure Batch 池。You can also create an Azure Batch pool with autoscale feature. 例如,可以根据挂起任务的数量不使用专用 VM 但使用自动缩放公式创建 Azure 批处理池。For example, you could create an azure batch pool with 0 dedicated VMs and an autoscale formula based on the number of pending tasks.

此处的示例公式可实现以下行为:最初创建池之后,它开始时包含 1 个 VM。The sample formula here achieves the following behavior: When the pool is initially created, it starts with 1 VM. $PendingTasks 度量值定义处于正在运行状态和活动(已排队)状态中的任务数。$PendingTasks metric defines the number of tasks in running + active (queued) state. 该公式查找过去 180 秒内的平均挂起任务数,并相应地设置 TargetDedicated。The formula finds the average number of pending tasks in the last 180 seconds and sets TargetDedicated accordingly. 它可确保 TargetDedicated 永不超过 25 个 VM。It ensures that TargetDedicated never goes beyond 25 VMs. 因此,随着新任务的提交,池会自动增长;随着任务的完成,VM 会逐个释放,并且自动缩放功能会收缩这些 VM。So, as new tasks are submitted, pool automatically grows and as tasks complete, VMs become free one by one and the autoscaling shrinks those VMs. 可根据自己的需要调整 startingNumberOfVMs 和 maxNumberofVMs。startingNumberOfVMs and maxNumberofVMs can be adjusted to your needs.

自动缩放公式:Autoscale formula:

startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);

有关详细信息,请参阅 Automatically scale compute nodes in an Azure Batch pool(自动缩放 Azure Batch 池中的计算节点)。See Automatically scale compute nodes in an Azure Batch pool for details.

如果池使用默认 autoScaleEvaluationInterval,则在运行自定义活动之前,Batch 服务可能需要 15-30 分钟准备 VM。If the pool is using the default autoScaleEvaluationInterval, the Batch service could take 15-30 minutes to prepare the VM before running the custom activity. 如果池使用其他 autoScaleEvaluationInterval,则 Batch 服务可能需要 autoScaleEvaluationInterval + 10 分钟。If the pool is using a different autoScaleEvaluationInterval, the Batch service could take autoScaleEvaluationInterval + 10 minutes.

后续步骤Next steps

参阅以下文章了解如何以其他方式转换数据:See the following articles that explain how to transform data in other ways: