Azure 数据工厂中的数据流活动

适用于: Azure 数据工厂

使用数据流活动通过映射数据流来转换和移动数据。 如果你不熟悉数据流,请参阅映射数据流概述

语法

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Type 属性

属性 说明 允许的值 必须
数据流 对正在执行的数据流的引用 DataFlowReference
integrationRuntime 运行数据流的计算环境。 如果未指定,将使用自动解析 Azure Integration Runtime。 IntegrationRuntimeReference
compute.coreCount Spark 群集中使用的内核数。 仅当使用自动解析 Azure Integration Runtime 时才能指定 8、16、32、48、80、144、272
compute.computeType Spark 群集中使用的计算类型。 仅当使用自动解析 Azure Integration Runtime 时才能指定 "General", "ComputeOptimized", "MemoryOptimized"
staging.linkedService 如果使用的是 Azure Synapse Analytics 源或接收器,请指定用于 PolyBase 暂存的存储帐户。

如果 Azure 存储配置了 VNet 服务终结点,则必须在存储帐户上启用“允许受信任的 Microsoft 服务”并使用托管标识身份验证,详见将 VNet 服务终结点与 Azure 存储配合使用的影响。 另请了解 Azure BlobAzure Data Lake Storage Gen2 的所需配置。
LinkedServiceReference 仅当数据流读取或写入 Azure Synapse Analytics 时
staging.folderPath 如果使用的是 Azure Synapse Analytics 源或接收器,则为 blob 存储帐户中用于 PolyBase 暂存的文件夹路径 字符串 仅当数据流读取或写入 Azure Synapse Analytics 时
traceLevel 设置数据流活动执行的日志记录级别 精细、粗略、无

执行数据流

在运行时动态调整数据流大小

可以动态设置“核心计数”和“计算类型”属性,以在运行时调整传入源数据的大小。 使用管道活动(例如“查找”或“获取元数据”),以便查找源数据集数据的大小。 然后,在“数据流”活动属性中使用“添加动态内容”。

动态数据流

数据流集成运行时

选择用于数据流活动执行的 Integration Runtime。 默认情况下,此服务将使用具有四个辅助角色核心的自动解析 Azure Integration Runtime。 此 IR 具有常规用途计算类型,并在与服务实例相同的区域中运行。 对于已运营化的管道,强烈建议创建你自己的 Azure Integration Runtime,用于定义执行数据流活动所需的特定区域、计算类型、核心计数和 TTL。

对于大多数生产工作负荷,建议至少使用一个采用 8+8(总计 16)个 v-Core 和 10 分钟配置的“常规用途”计算类型(对于大型工作负荷,建议不要使用“计算优化”计算类型)。 通过设置小型 TTL,Azure IR 可以维护一个热群集,它不会出现冷群集那种需要几分钟时间才能启动的情况。 通过在 Azure IR 数据流配置上选择“快速重复使用”,你还可以进一步加快数据流的执行速度。 有关详细信息,请参阅 Azure Integration Runtime

Azure Integration Runtime

重要

数据流活动中的 Integration Runtime 选择仅适用于管道的已触发执行。 如果使用数据流调试管道,则可在群集(在调试会话中指定)上运行。

PolyBase

如果使用 Azure Synapse Analytics 作为接收器或源,则必须为 PolyBase 批量加载选择一个暂存位置。 PolyBase 允许批量加载而不是逐行加载数据。 PolyBase 可大大减少 Azure Synapse Analytics 的加载时间。

日志记录级别

如果不需要数据流活动的每个管道执行完整地记录所有详细的遥测日志,则可根据需要将日志记录级别设置为“基本”或“无”。 在“详细”模式(默认)下执行数据流时,要求服务在数据转换期间完整地记录每个分区级别的活动。 该操作成本昂贵,因此仅在进行故障排除时启用“详细”模式可优化整体数据流和管道性能。 “基本”模式仅记录转换持续时间,而“无”模式仅提供持续时间的摘要。

日志记录级别

接收器属性

使用数据流中的分组功能,既可以设置接收器的执行顺序,又可以使用相同的组号将接收器分组在一起。 为帮助管理组,可以要求服务在同一组中并行运行接收器。 还可以将接收器组设置为继续,即使其中一个接收器遇到错误仍可这样设置。

数据流接收器的默认行为是以串行方式顺序执行每个接收器,并且在接收器中遇到错误时使数据流失败。 此外,除非进入数据流属性并为接收器设置不同的优先级,否则所有接收器均默认为同一组。

接收器属性

将数据流参数化

参数化数据集

如果数据流使用参数化数据集,请在“设置”选项卡中设置参数值。

执行数据流参数

参数化数据流

如果数据流已参数化,请在“参数”选项卡中设置数据流参数的动态值。可以使用管道表达式语言或数据流表达式语言来分配动态或文本参数值。 有关详细信息,请参阅数据流参数

参数化计算属性。

如果使用自动解析 Azure Integration Runtime 并为 compute.coreCount 和 compute.computeType 指定值,则可以将核心数或计算类型参数化。

执行数据流参数示例

数据流活动的管道调试

若要执行通过数据流活动运行的调试管道,则必须通过顶部栏中的“数据流调试”滑块来打开数据流调试模式。 借助调试模式,可以针对活动的 Spark 群集运行数据流。 有关详细信息,请参阅调试模式

显示“调试”按钮所在位置的屏幕截图

调试管道针对活动的调试群集运行,而不是针对“数据流”活动设置中指定的集成运行时环境运行。 在启动调试模式时,可以选择调试计算环境。

监视数据流活动

数据流活动具有特殊的监视体验,你可以在其中查看分区、暂存时间和数据世系信息。 通过“操作”下的眼镜图标打开监视窗格。 有关详细信息,请参阅监视数据流

使用数据流活动会导致后续活动

数据流活动输出有关写入每个接收器的行数和从每个源读取的行数的指标。 这些结果将在活动运行结果的 output 部分中返回。 返回的指标采用以下 json 格式。

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

例如,要获取在名为“dataflowActivity”的活动中写入接收器“sink1”的行数,可使用 @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten

要获取从该接收器中使用的源“source1”读取的行数,可使用 @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead

备注

如果接收器写入的行数为零,则它将不会显示在指标中。 可以使用 contains 函数来验证行是否存在。 例如,contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') 将检查是否存在写入 sink1 的任意行。

后续步骤

查看受支持的控制流活动: