映射数据流中的接收器转换Sink transformation in mapping data flow

适用于: Azure 数据工厂 Azure Synapse Analytics

转换数据后,使用接收器转换将其写入目标存储。After you finish transforming your data, write it into a destination store by using the sink transformation. 每个数据流至少需要一个接收器转换,但你可根据需要写入任意多个接收器来完成转换流。Every data flow requires at least one sink transformation, but you can write to as many sinks as necessary to complete your transformation flow. 若要写入更多接收器,请通过新的分支和有条件拆分创建新数据流。To write to additional sinks, create new streams via new branches and conditional splits.

每个接收器转换只与一个 Azure 数据工厂数据集对象或链接服务相关联。Each sink transformation is associated with exactly one Azure Data Factory dataset object or linked service. 接收器转换决定要写入的数据的形状和位置。The sink transformation determines the shape and location of the data you want to write to.

内联数据集Inline datasets

创建接收器转换时,请选择是在数据集对象内还是在接收器转换中定义接收器信息。When you create a sink transformation, choose whether your sink information is defined inside a dataset object or within the sink transformation. 大多数格式只适用于其中之一。Most formats are available in only one or the other. 若要了解如何使用特定连接器,请参阅相应的连接器文档。To learn how to use a specific connector, see the appropriate connector document.

内联和数据集对象同时支持某一格式时,两者都有优势。When a format is supported for both inline and in a dataset object, there are benefits to both. 数据集对象是可重用的实体,可在其他数据流和复制等活动中使用。Dataset objects are reusable entities that can be used in other data flows and activities such as Copy. 使用强化的架构时,这些可重复使用的实体特别有用。These reusable entities are especially useful when you use a hardened schema. 数据集不基于 Spark。Datasets aren't based in Spark. 有时,可能需要在接收器转换中覆盖某些设置或架构投影。Occasionally, you might need to override certain settings or schema projection in the sink transformation.

如果使用灵活的架构、一次性接收器实例或参数化接收器,建议使用内联数据集。Inline datasets are recommended when you use flexible schemas, one-off sink instances, or parameterized sinks. 如果接收器参数化程度高,则使用内联数据集可以不创建“虚拟”对象。If your sink is heavily parameterized, inline datasets allow you to not create a "dummy" object. 内联数据集基于 Spark,其属性是数据流原生的。Inline datasets are based in Spark, and their properties are native to data flow.

若要使用内联数据集,请在“接收器类型”选择器中选择所需的格式。To use an inline dataset, select the format you want in the Sink type selector. 选择要与其连接的链接服务,而不是选择接收器数据集。Instead of selecting a sink dataset, you select the linked service you want to connect to.

显示已选择“内联”的屏幕截图。Screenshot that shows Inline selected.

支持的接收器类型Supported sink types

映射数据流遵循提取、加载和转换 (ELT) 方法,可与全部位于 Azure 中的暂存数据集结合使用。Mapping data flow follows an extract, load, and transform (ELT) approach and works with staging datasets that are all in Azure. 目前,以下数据集可用于源转换。Currently, the following datasets can be used in a source transformation.

连接器Connector 格式Format 数据集/内联Dataset/inline
Azure Blob 存储Azure Blob Storage AvroAvro
带分隔符的文本Delimited text
增量Delta
JSONJSON
ORCORC
ParquetParquet
✓/-✓/-
✓/-✓/-
-/✓-/✓
✓/-✓/-
✓/✓✓/✓
✓/-✓/-
Azure Cosmos DB (SQL API)Azure Cosmos DB (SQL API) ✓/-✓/-
Azure Data Lake Storage Gen2Azure Data Lake Storage Gen2 AvroAvro
常见数据模型Common Data Model
带分隔符的文本Delimited text
增量Delta
JSONJSON
ORCORC
ParquetParquet
✓/-✓/-
-/✓-/✓
✓/-✓/-
-/✓-/✓
✓/-✓/-
✓/✓✓/✓
✓/-✓/-
Azure Database for PostgreSQLAzure Database for PostgreSQL ✓/✓✓/✓
Azure SQL 数据库Azure SQL Database ✓/-✓/-
Azure SQL 托管实例(预览)Azure SQL Managed Instance (preview) ✓/-✓/-
Azure Synapse AnalyticsAzure Synapse Analytics ✓/-✓/-
SnowflakeSnowflake ✓/✓✓/✓

特定于这些连接器的设置位于“设置”选项卡上。有关这些设置的信息和数据流脚本示例位于连接器文档中。Settings specific to these connectors are located on the Settings tab. Information and data flow script examples on these settings are located in the connector documentation.

Azure 数据工厂可以访问 90 多个原生连接器Azure Data Factory has access to more than 90 native connectors. 若要将数据从数据流写入其他源,请使用复制活动从受支持的接收器加载该数据。To write data to those other sources from your data flow, use the Copy Activity to load that data from a supported sink.

接收器设置Sink settings

添加接收器后,请通过“接收器”选项卡进行配置。可在此处选择或创建接收器所写入的数据集。After you've added a sink, configure via the Sink tab. Here you can pick or create the dataset your sink writes to. 可在调试设置中配置数据集参数的开发值。Development values for dataset parameters can be configured in Debug settings. (必须打开调试模式。)(Debug mode must be turned on.)

显示“接收器”设置的屏幕截图。Screenshot that shows Sink settings.

架构偏差架构偏差是数据工厂的功能,可原生处理数据流中的灵活架构,而无需显式定义列更改。Schema drift: Schema drift is the ability of Data Factory to natively handle flexible schemas in your data flows without needing to explicitly define column changes. 启用“允许架构偏差”在接收器数据架构中定义的内容之上写入额外的列。Enable Allow schema drift to write additional columns on top of what's defined in the sink data schema.

验证架构:如果选择了“验证架构”,但在源投影中未找到传入源架构的任何列,或者数据类型不匹配,则数据流将失败。Validate schema: If validate schema is selected, the data flow will fail if any column of the incoming source schema isn't found in the source projection, or if the data types don't match. 使用此设置可强制要求源数据满足定义的投影的协定。Use this setting to enforce that the source data meets the contract of your defined projection. 非常适用于在数据库源方案中指示列名称或类型已发生更改。It's useful in database source scenarios to signal that column names or types have changed.

缓存接收器Cache sink

缓存接收器是指数据流将数据写入 Spark 缓存,而不是数据存储。A cache sink is when a data flow writes data into the Spark cache instead of a data store. 在映射数据流中,可以使用缓存查找在同一流中多次引用此数据。In mapping data flows, you can reference this data within the same flow many times using a cache lookup. 想要引用数据作为表达式的一部分,但又不想显式地将列联接到其中时,这很有用。This is useful when you want to reference data as part of an expression but don't want to explicitly join the columns to it. 缓存接收器的常见示例:查找数据存储中的最大值,将错误代码与错误消息数据库进行匹配。Common examples where a cache sink can help are looking up a max value on a data store and matching error codes to an error message database.

若要写入缓存接收器,请添加接收器转换,然后选择“缓存”作为接收器类型。To write to a cache sink, add a sink transformation and select Cache as the sink type. 与其他接收器类型不同,由于不写入外部存储,因此无需选择数据集或链接服务。Unlike other sink types, you don't need to select a dataset or linked service because you aren't writing to an external store.

选择缓存接收器Select cache sink

在接收器设置中,可以根据需要指定缓存接收器的键列。In the sink settings, you can optionally specify the key columns of the cache sink. 在缓存查找中使用 lookup() 函数时,这些用作匹配条件。These are used as matching conditions when using the lookup() function in a cache lookup. 如果指定键列,则不能在缓存查找中使用 outputs() 函数。If you specify key columns, you can't use the outputs() function in a cache lookup. 若要了解有关缓存查找语法的详细信息,请参阅缓存查找To learn more about the cache lookup syntax, see cached lookups.

缓存接收器键列Cache sink key columns

例如,如果在名为 cacheExample 的缓存接收器中指定一个键列 column1,则调用 cacheExample#lookup() 时将有一个参数指定匹配缓存接收器中的哪一行。For example, if I specify a single key column of column1 in a cache sink called cacheExample, calling cacheExample#lookup() would have one parameter specifies which row in the cache sink to match on. 函数输出一个复杂列,其中每个映射的列都有子列。The function outputs a single complex column with subcolumns for each column mapped.

备注

缓存接收器必须位于数据流中,且该数据流必须完全独立于通过缓存查找对其进行引用的任意转换。A cache sink must be in a completely independent data stream from any transformation referencing it via a cache lookup. 缓存接收器还必须是第一个写入的接收器。A cache sink also must the first sink written.

字段映射Field mapping

与选择转换类似,可以在接收器的“映射”选项卡上决定要写入哪些传入列。Similar to a select transformation, on the Mapping tab of the sink, you can decide which incoming columns will get written. 默认情况下,将映射所有输入列,包括偏移列。By default, all input columns, including drifted columns, are mapped. 此行为称为自动映射。This behavior is known as automapping.

关闭自动映射时,可以添加基于固定列的映射或基于规则的映射。When you turn off automapping, you can add either fixed column-based mappings or rule-based mappings. 使用基于规则的映射,可以编写具有模式匹配的表达式。With rule-based mappings, you can write expressions with pattern matching. 固定的映射会映射逻辑和物理列名称。Fixed mapping maps logical and physical column names. 有关基于规则的映射的详细信息,请参阅映射数据流中的列模式For more information on rule-based mapping, see Column patterns in mapping data flow.

自定义接收器排序Custom sink ordering

默认情况下,数据以不确定的顺序写入多个接收器。By default, data is written to multiple sinks in a nondeterministic order. 转换逻辑完成时,执行引擎并行写入数据,并且接收器排序可能会在每次运行时发生变化。The execution engine writes data in parallel as the transformation logic is completed, and the sink ordering might vary each run. 若要指定确切的接收器排序,请在数据流的“常规”选项卡上启用“自定义接收器排序” 。To specify an exact sink ordering, enable Custom sink ordering on the General tab of the data flow. 启用后,接收器按递增顺序写入。When enabled, sinks are written sequentially in increasing order.

显示自定义接收器排序的屏幕截图。Screenshot that shows Custom sink ordering.

备注

使用缓存查询时,请确保接收器排序已将缓存接收器设置为 1,即排序中最低的一个(或第一个)。When utilizing cached lookups, make sure that your sink ordering has the cached sinks set to 1, the lowest (or first) in ordering.

自定义接收器排序Custom sink ordering

接收器组Sink groups

可以对一系列接收器使用同一顺序编号来将接收器组合在一起。You can group sinks together by applying the same order number for a series of sinks. ADF 会将这些接收器视为可并行执行的组。ADF will treat those sinks as groups that can execute in parallel. 管道数据流活动中将显示并行执行的选项。Options for parallel execution will surface in the pipeline data flow activity.

行处理时出错Error row handling

写入数据库时,某些数据行可能会由于目标设置的约束而失败。When writing to databases, certain rows of data may fail due to constraints set by the destination. 默认情况下,遇到第一个错误时,数据流运行将失败。By default, a data flow run will fail on the first error it gets. 在某些连接器中,可以选择“出错时继续运行”,确保即使单个行存在错误,也可以完成数据流。In certain connectors, you can choose to Continue on error that allows your data flow to complete even if individual rows have errors. 目前,此功能仅在 Azure SQL 数据库中可用。Currently, this capability is only available in Azure SQL Database. 有关详细信息,请参阅 Azure SQL DB 中的错误行处理For more information, see error row handling in Azure SQL DB.

接收器中的数据预览Data preview in sink

在调试群集中获取数据预览时,不会将任何数据写入接收器。When fetching a data preview in debug mode, no data will be written to your sink. 将返回数据的快照,但不会将任何内容写入目标。A snapshot of what the data looks like will be returned, but nothing will be written to your destination. 要测试将数据写入接收器,请从管道画布运行管道调试。To test writing data into your sink, run a pipeline debug from the pipeline canvas.

数据流脚本Data flow script

示例Example

下面是接收器转换及其数据流脚本的一个示例:Below is an example of a sink transformation and its data flow script:

sink(input(
        movie as integer,
        title as string,
        genres as string,
        year as integer,
        Rating as integer
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    deletable:false,
    insertable:false,
    updateable:true,
    upsertable:false,
    keys:['movie'],
    format: 'table',
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true,
    saveOrder: 1,
    errorHandlingOption: 'stopOnFirstError') ~> sink1

后续步骤Next steps

创建数据流后,请将数据流活动添加到管道Now that you've created your data flow, add a data flow activity to your pipeline.