适用于:Azure 数据工厂
Azure Synapse Analytics
数据流在 Azure 数据工厂管道和 Azure Synapse Analytics 管道中都可用。 本文适用于映射数据流。 如果你不熟悉转换,请参阅介绍性文章: 使用映射数据流转换数据。
转换数据后,使用接收器转换将其写入目标存储。 每个数据流至少需要一个接收器转换,但你可根据需要写入任意多个接收器来完成转换流。 若要写入更多接收器,请通过新的分支和有条件拆分创建新数据流。
每个接收器转换只与一个数据集对象或链接服务相关联。 接收器转换决定要写入的数据的形状和位置。
创建接收器转换时,请选择是在数据集对象内还是在接收器转换中定义接收器信息。 大多数格式只适用于其中之一。 若要了解如何使用特定连接器,请参阅相应的连接器文档。
内联和数据集对象同时支持某一格式时,两者都有优势。 数据集对象是可重用的实体,可在其他数据流和复制等活动中使用。 使用强化的架构时,这些可重复使用的实体特别有用。 数据集不基于 Spark。 有时,可能需要在接收器转换中覆盖某些设置或架构投影。
如果使用灵活的架构、一次性接收器实例或参数化接收器,建议使用内联数据集。 如果接收器参数化程度高,则使用内联数据集可以不创建“虚拟”对象。 内联数据集基于 Spark,其属性是数据流原生的。
若要使用内联数据集,请在“接收器类型”选择器中选择所需的格式。 选择要与其连接的链接服务,而不是选择接收器数据集。
映射数据流遵循提取、加载和转换 (ELT) 方法,可与全部位于 Azure 中的暂存数据集结合使用。 目前,以下数据集可用于接收器转换。
提示
接收器的格式可能与源不同。 这是如何从一种格式转换为另一种格式的一个步骤。 例如,从 CSV 到 Parquet 接收器。 可能需要在源和接收器之间进行数据流中的一些转换,这样才能正常工作。 (例如,Parquet 的标头要求比 CSV 更具体。
连接器 | 格式 | 数据集/内联 |
---|---|---|
Azure Blob 存储 |
Avro 带分隔符的文本 增量 JSON ORC 拼花地板 |
✓/✓ ✓/✓ -/✓ ✓/✓ ✓/✓ ✓/✓ |
用于 NoSQL 的 Azure Cosmos DB | ✓/- | |
Azure Data Lake Storage Gen2 |
Avro 常见数据模型 带分隔符的文本 增量 JSON ORC 拼花地板 |
✓/✓ -/✓ ✓/✓ -/✓ ✓/✓ ✓/✓ ✓/✓ |
Azure Database for MySQL | ✓/✓ | |
Azure Database for PostgreSQL | ✓/✓ | |
Azure 数据资源管理器 | ✓/✓ | |
Azure SQL 数据库 | ✓/✓ | |
Azure SQL 托管实例 | ✓/- | |
Azure Synapse Analytics | ✓/- | |
Dataverse | ✓/✓ | |
Dynamics 365 | ✓/✓ | |
Dynamics CRM | ✓/✓ | |
SFTP |
Avro 带分隔符的文本 JSON ORC 拼花地板 |
✓/✓ ✓/✓ ✓/✓ ✓/✓ ✓/✓ |
雪花 | ✓/✓ | |
SQL Server | ✓/✓ |
这些连接器的特定设置位于“设置”选项卡上。有关这些设置的信息和数据流脚本示例位于连接器文档中。
服务可以访问超过 90 个原生连接器。 若要将数据从数据流写入其他源,请使用复制活动从受支持的接收器加载该数据。
添加接收器后,请通过“接收器”选项卡进行配置。可在此处选择或创建接收器所写入的数据集。 可在调试设置中配置数据集参数的开发值。 (必须打开调试模式。)
架构偏差:架构偏差是指在无需显式定义列更改的情况下,服务在数据流中以原生方式处理灵活架构的能力。 启用“允许架构偏差”在接收器数据架构中定义的内容之上写入额外的列。
验证架构:如果选择了验证架构,则如果接收器投影中的任一列在接收器存储中未找到,或者数据类型不匹配,数据流将失败。 使用此设置可强制要求接收器架构满足定义的投影的协定。 非常适用于在数据库接收器方案中指示列名称或类型已发生更改。
缓存接收器是指数据流将数据写入 Spark 缓存,而不是数据存储。 在映射数据流中,可以使用缓存查找在同一流中多次引用此数据。 想要引用数据作为表达式的一部分,但又不想显式地将列联接到其中时,这很有用。 缓存接收器的常见示例:查找数据存储中的最大值,将错误代码与错误消息数据库进行匹配。
若要写入缓存接收器,请添加接收器转换,然后选择“缓存”作为接收器类型。 与其他接收器类型不同,由于不写入外部存储,因此无需选择数据集或链接服务。
在接收器设置中,可以根据需要指定缓存接收器的键列。 在缓存查找中使用 lookup()
函数时,这些用作匹配条件。 如果指定键列,则不能在缓存查找中使用 outputs()
函数。 若要了解有关缓存查找语法的详细信息,请参阅缓存查找。
例如,如果在名为 column1
的缓存接收器中指定一个键列 cacheExample
,则调用 cacheExample#lookup()
时将有一个参数指定匹配缓存接收器中的哪一行。 函数输出一个复杂列,其中每个映射的列都有子列。
备注
缓存接收器必须位于数据流中,且该数据流必须完全独立于通过缓存查找对其进行引用的任意转换。 缓存接收器还必须是第一个写入的接收器。
写入活动输出
缓存接收器可以选择将其数据写入数据流活动的输出,然后该输出可用作管道中另一个活动的输入。 这样便可以快速轻松地将数据传入数据流活动,而无需将数据保存在数据存储中。
请注意,直接注入到管道中的数据流的输出限制为 2MB。 因此,数据流将尝试在不超出 2MB 限制的同时向输出添加尽可能多的行,因此有时你可能不会在活动输出中看到所有行。 在数据流活动级别设置“仅第一行”还有助于根据需要限制数据流的数据输出。
对于数据库接收器类型,“设置”选项卡将包含“更新方法”属性。 默认值为“插入”,但也包括用于更新、更新插入和删除的复选框选项。 若要利用这些附加选项,需要在接收器之前添加更改行转换。 “更改行”将允许你为每个数据库操作定义条件。 如果源是启用 CDC 的本机源,则可以设置不带“更改行”的更新方法,因为 ADF 已知道插入、更新、更新插入和删除的行标记。
与选择转换类似,可以在接收器的“映射”选项卡上决定要写入哪些传入列。 默认情况下,将映射所有输入列,包括偏移列。 此行为称为自动映射。
关闭自动映射时,可以添加基于固定列的映射或基于规则的映射。 使用基于规则的映射,可以编写具有模式匹配的表达式。 固定的映射会映射逻辑和物理列名称。 有关基于规则的映射的详细信息,请参阅映射数据流中的列模式。
默认情况下,数据以不确定的顺序写入多个接收器。 转换逻辑完成时,执行引擎并行写入数据,并且接收器排序可能会在每次运行时发生变化。 若要指定确切的接收器排序,请在数据流的“常规”选项卡上启用“自定义接收器排序” 。 启用后,接收器按递增顺序写入。
备注
使用缓存查询时,请确保接收器排序已将缓存接收器设置为 1,即排序中最低的一个(或第一个)。
可以对一系列接收器使用同一顺序编号来将接收器组合在一起。 服务会将这些接收器视为可并行执行的组。 管道数据流活动中将显示并行执行的选项。
在“接收器错误”选项卡上,可以配置错误行处理来捕获和重定向数据库驱动程序错误的输出和失败断言的输出。
写入数据库时,某些数据行可能会由于目标设置的约束而失败。 默认情况下,遇到第一个错误时,数据流运行将失败。 在某些连接器中,可以选择“出错时继续运行”,确保即使单个行存在错误,也可以完成数据流。 目前,此功能仅在 Azure SQL 数据库和 Azure Synapse 中可用。 有关详细信息,请参阅 Azure SQL DB 中的错误行处理。
对于断言失败行,你可以在数据流的上游使用“断言转换”,然后将失败的断言重定向到“接收器错误”选项卡中此处的输出文件。你还可以在此处选择忽略断言失败的行,而不将这些行输出到接收器目标数据存储。
在调试群集中获取数据预览时,不会将任何数据写入接收器。 将返回数据的快照,但不会将任何内容写入目标。 要测试将数据写入接收器,请从管道画布运行管道调试。
下面是接收器转换及其数据流脚本的一个示例:
sink(input(
movie as integer,
title as string,
genres as string,
year as integer,
Rating as integer
),
allowSchemaDrift: true,
validateSchema: false,
deletable:false,
insertable:false,
updateable:true,
upsertable:false,
keys:['movie'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true,
saveOrder: 1,
errorHandlingOption: 'stopOnFirstError') ~> sink1
创建数据流后,请将数据流活动添加到管道。