Azure 数据工厂中的数据流活动Data Flow activity in Azure Data Factory

适用于: Azure 数据工厂 Azure Synapse Analytics

使用数据流活动通过映射数据流来转换和移动数据。Use the Data Flow activity to transform and move data via mapping data flows. 如果你不熟悉数据流,请参阅映射数据流概述If you're new to data flows, see Mapping Data Flow overview

语法Syntax

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Type 属性Type properties

属性Property 说明Description 允许的值Allowed values 必须Required
数据流dataflow 对正在执行的数据流的引用The reference to the Data Flow being executed DataFlowReferenceDataFlowReference Yes
integrationRuntimeintegrationRuntime 运行数据流的计算环境。The compute environment the data flow runs on. 如果未指定,将使用自动解析 Azure Integration Runtime。If not specified, the auto-resolve Azure integration runtime will be used. IntegrationRuntimeReferenceIntegrationRuntimeReference No
compute.coreCountcompute.coreCount Spark 群集中使用的内核数。The number of cores used in the spark cluster. 仅当使用自动解析 Azure Integration Runtime 时才能指定Can only be specified if the auto-resolve Azure Integration runtime is used 8、16、32、48、80、144、2728, 16, 32, 48, 80, 144, 272 No
compute.computeTypecompute.computeType Spark 群集中使用的计算类型。The type of compute used in the spark cluster. 仅当使用自动解析 Azure Integration Runtime 时才能指定Can only be specified if the auto-resolve Azure Integration runtime is used "General", "ComputeOptimized", "MemoryOptimized""General", "ComputeOptimized", "MemoryOptimized" No
staging.linkedServicestaging.linkedService 如果使用的是 Azure Synapse Analytics 源或接收器,请指定用于 PolyBase 暂存的存储帐户。If you're using an Azure Synapse Analytics source or sink, specify the storage account used for PolyBase staging.

如果 Azure 存储配置了 VNet 服务终结点,则必须在存储帐户上启用“允许受信任的 Microsoft 服务”并使用托管标识身份验证,详见将 VNet 服务终结点与 Azure 存储配合使用的影响If your Azure Storage is configured with VNet service endpoint, you must use managed identity authentication with "allow trusted Microsoft service" enabled on storage account, refer to Impact of using VNet Service Endpoints with Azure storage. 另请了解 Azure BlobAzure Data Lake Storage Gen2 的所需配置。Also learn the needed configurations for Azure Blob and Azure Data Lake Storage Gen2 respectively.
LinkedServiceReferenceLinkedServiceReference 仅当数据流读取或写入 Azure Synapse Analytics 时Only if the data flow reads or writes to an Azure Synapse Analytics
staging.folderPathstaging.folderPath 如果使用的是 Azure Synapse Analytics 源或接收器,则为 blob 存储帐户中用于 PolyBase 暂存的文件夹路径If you're using an Azure Synapse Analytics source or sink, the folder path in blob storage account used for PolyBase staging 字符串String 仅当数据流读取或写入 Azure Synapse Analytics 时Only if the data flow reads or writes to Azure Synapse Analytics
traceLeveltraceLevel 设置数据流活动执行的日志记录级别Set logging level of your data flow activity execution 精细、粗略、无Fine, Coarse, None No

执行数据流Execute Data Flow

在运行时动态调整数据流大小Dynamically size data flow compute at runtime

可以动态设置“核心计数”和“计算类型”属性,以在运行时调整传入源数据的大小。The Core Count and Compute Type properties can be set dynamically to adjust to the size of your incoming source data at runtime. 使用管道活动(例如“查找”或“获取元数据”),以便查找源数据集数据的大小。Use pipeline activities like Lookup or Get Metadata in order to find the size of the source dataset data. 然后,在“数据流”活动属性中使用“添加动态内容”。Then, use Add Dynamic Content in the Data Flow activity properties.

动态数据流Dynamic Data Flow

数据流集成运行时Data Flow integration runtime

选择用于数据流活动执行的 Integration Runtime。Choose which Integration Runtime to use for your Data Flow activity execution. 默认情况下,数据工厂将使用具有四个辅助角色核心的自动解析 Azure Integration Runtime。By default, Data Factory will use the auto-resolve Azure Integration runtime with four worker cores. 此 IR 具有常规用途计算类型,并在与工厂相同的区域中运行。This IR has a general purpose compute type and runs in the same region as your factory. 对于可进行操作的管道,强烈建议创建自己的 Azure Integration Runtime,用于定义执行数据流活动所需的具体区域、计算类型、核心计数和 TTL。For operationalized pipelines, it is highly recommended that you create your own Azure Integration Runtimes that define specific regions, compute type, core counts, and TTL for your data flow activity execution.

对于大多数生产工作负载,建议的最低计算类型为具有 8+8(总共 16 个 v-core)配置和 10 分钟时间的常规用途计算类型(不建议对大型工作负载使用计算优化)。A minimum compute type of General Purpose (compute optimized is not recommended for large workloads) with an 8+8 (16 total v-cores) configuration and a 10-minute is the minimum recommendation for most production workloads. 通过设置小型 TTL,Azure IR 可以让群集始终处于预热状态,而不会导致冷群集的启动时间长达几分钟。By setting a small TTL, the Azure IR can maintain a warm cluster that will not incur the several minutes of start time for a cold cluster. 通过在 Azure IR 数据流配置上选择“快速重用”,可以更进一步地加速数据流的执行。You can speed up the execution of your data flows even more by select "Quick re-use" on the Azure IR data flow configurations. 有关详细信息,请参阅 Azure Integration RuntimeFor more information, see Azure integration runtime.

Azure Integration RuntimeAzure Integration Runtime

重要

数据流活动中的 Integration Runtime 选择仅适用于管道的已触发执行。The Integration Runtime selection in the Data Flow activity only applies to triggered executions of your pipeline. 如果使用数据流调试管道,则可在群集(在调试会话中指定)上运行。Debugging your pipeline with data flows runs on the cluster specified in the debug session.

PolyBasePolyBase

如果使用 Azure Synapse Analytics 作为接收器或源,则必须为 PolyBase 批量加载选择一个暂存位置。If you're using an Azure Synapse Analytics as a sink or source, you must choose a staging location for your PolyBase batch load. PolyBase 允许批量加载而不是逐行加载数据。PolyBase allows for batch loading in bulk instead of loading the data row-by-row. PolyBase 可大大减少 Azure Synapse Analytics 的加载时间。PolyBase drastically reduces the load time into Azure Synapse Analytics.

日志记录级别Logging level

如果不需要数据流活动的每个管道执行完整地记录所有详细的遥测日志,则可根据需要将日志记录级别设置为“基本”或“无”。If you do not require every pipeline execution of your data flow activities to fully log all verbose telemetry logs, you can optionally set your logging level to "Basic" or "None". 在“详细”模式(默认)下执行数据流时,要求 ADF 在数据转换期间完整地记录每个分区级别的活动。When executing your data flows in "Verbose" mode (default), you are requesting ADF to fully log activity at each individual partition level during your data transformation. 该操作成本昂贵,因此仅在进行故障排除时启用“详细”模式可优化整体数据流和管道性能。This can be an expensive operation, so only enabling verbose when troubleshooting can improve your overall data flow and pipeline performance. “基本”模式仅记录转换持续时间,而“无”模式仅提供持续时间的摘要。"Basic" mode will only log transformation durations while "None" will only provide a summary of durations.

日志记录级别Logging level

接收器属性Sink properties

使用数据流中的分组功能,既可以设置接收器的执行顺序,又可以使用相同的组号将接收器分组在一起。The grouping feature in data flows allow you to both set the order of execution of your sinks as well as to group sinks together using the same group number. 为了帮助管理组,可以要求 ADF 并行运行同一组中的接收器。To help manage groups, you can ask ADF to run sinks, in the same group, in parallel. 还可以将接收器组设置为继续,即使其中一个接收器遇到错误仍可这样设置。You can also set the sink group to continue even after one of the sinks encounters an error.

数据流接收器的默认行为是以串行方式顺序执行每个接收器,并且在接收器中遇到错误时使数据流失败。The default behavior of data flow sinks is to execute each sink sequentially, in a serial manner, and to fail the data flow when an error is encountered in the sink. 此外,除非进入数据流属性并为接收器设置不同的优先级,否则所有接收器均默认为同一组。Additionally, all sinks are defaulted to the same group unless you go into the data flow properties and set different priorities for the sinks.

接收器属性Sink properties

将数据流参数化Parameterizing Data Flows

参数化数据集Parameterized datasets

如果数据流使用参数化数据集,请在“设置”选项卡中设置参数值。If your data flow uses parameterized datasets, set the parameter values in the Settings tab.

执行数据流参数Execute Data Flow Parameters

参数化数据流Parameterized data flows

如果数据流已参数化,请在“参数”选项卡中设置数据流参数的动态值。可以使用 ADF 管道表达式语言或数据流表达式语言来分配动态或文字参数值。If your data flow is parameterized, set the dynamic values of the data flow parameters in the Parameters tab. You can use either the ADF pipeline expression language or the data flow expression language to assign dynamic or literal parameter values. 有关详细信息,请参阅数据流参数For more information, see Data Flow Parameters.

参数化计算属性。Parameterized compute properties.

如果使用自动解析 Azure Integration Runtime 并为 compute.coreCount 和 compute.computeType 指定值,则可以将核心数或计算类型参数化。You can parameterize the core count or compute type if you use the auto-resolve Azure Integration runtime and specify values for compute.coreCount and compute.computeType.

执行数据流参数示例Execute Data Flow Parameter Example

数据流活动的管道调试Pipeline debug of Data Flow activity

若要执行通过数据流活动运行的调试管道,则必须通过顶部栏中的“数据流调试”滑块来打开数据流调试模式。To execute a debug pipeline run with a Data Flow activity, you must switch on data flow debug mode via the Data Flow Debug slider on the top bar. 借助调试模式,可以针对活动的 Spark 群集运行数据流。Debug mode lets you run the data flow against an active Spark cluster. 有关详细信息,请参阅调试模式For more information, see Debug Mode.

屏幕截图显示了“调试”按钮的位置

调试管道针对活动的调试群集运行,而不是针对“数据流”活动设置中指定的集成运行时环境运行。The debug pipeline runs against the active debug cluster, not the integration runtime environment specified in the Data Flow activity settings. 在启动调试模式时,可以选择调试计算环境。You can choose the debug compute environment when starting up debug mode.

监视数据流活动Monitoring the Data Flow activity

数据流活动具有特殊的监视体验,你可以在其中查看分区、暂存时间和数据世系信息。The Data Flow activity has a special monitoring experience where you can view partitioning, stage time, and data lineage information. 通过“操作”下的眼镜图标打开监视窗格。Open the monitoring pane via the eyeglasses icon under Actions. 有关详细信息,请参阅监视数据流For more information, see Monitoring Data Flows.

使用数据流活动会导致后续活动Use Data Flow activity results in a subsequent activity

数据流活动输出有关写入每个接收器的行数和从每个源读取的行数的指标。The data flow activity outputs metrics regarding the number of rows written to each sink and rows read from each source. 这些结果将在活动运行结果的 output 部分中返回。These results are returned in the output section of the activity run result. 返回的指标采用以下 json 格式。The metrics returned are in the format of the below json.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

例如,要获取在名为“dataflowActivity”的活动中写入接收器“sink1”的行数,可使用 @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWrittenFor example, to get to number of rows written to a sink named 'sink1' in an activity named 'dataflowActivity', use @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten.

要获取从该接收器中使用的源“source1”读取的行数,可使用 @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsReadTo get the number of rows read from a source named 'source1' that was used in that sink, use @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead.

备注

如果接收器写入的行数为零,则它将不会显示在指标中。If a sink has zero rows written, it will not show up in metrics. 可以使用 contains 函数来验证行是否存在。Existence can be verified using the contains function. 例如,contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') 将检查是否存在写入 sink1 的任意行。For example, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') will check whether any rows were written to sink1.

后续步骤Next steps

请参阅数据工厂支持的控制流活动:See control flow activities supported by Data Factory: