数据流脚本 (DFS)Data flow script (DFS)

适用于: Azure 数据工厂 Azure Synapse Analytics

数据流脚本 (DFS) 是基础元数据,类似于编码语言,用于执行映射数据流中包含的转换。Data flow script (DFS) is the underlying metadata, similar to a coding language, that is used to execute the transformations that are included in a mapping data flow. 每次转换均由一系列属性表示,这些属性提供正常运行作业所需的信息。Every transformation is represented by a series of properties that provide the necessary information to run the job properly. 通过单击浏览器 UI 顶部功能区上的“脚本”按钮,可从 ADF 查看和编辑该脚本。The script is visible and editable from ADF by clicking on the "script" button on the top ribbon of the browser UI.

“脚本”按钮Script button

例如,源转换中的 allowSchemaDrift: true, 会指示服务将源数据集中的所有列包含在数据流中,即使这些列未包含在架构投影中。For instance, allowSchemaDrift: true, in a source transformation tells the service to include all columns from the source dataset in the data flow even if they are not included in the schema projection.

用例Use cases

DFS 由用户界面自动生成。The DFS is automatically produced by the user interface. 可以单击“脚本”按钮来查看并自定义该脚本。You can click the Script button to view and customize the script. 还可以在 ADF UI 外部生成脚本,然后将其传递到 PowerShell cmdlet。You can also generate scripts outside of the ADF UI and then pass that into the PowerShell cmdlet. 调试复杂数据流时,你可能会发现扫描脚本代码隐藏比扫描流的 UI 图形表示形式更容易。When debugging complex data flows, you may find it easier to scan the script code-behind instead of scanning the UI graph representation of your flows.

下面是一些示例用例:Here are a few example use cases:

  • 以编程方式生成许多非常相似的数据流,即“标记”数据流。Programatically producing many data flows that are fairly similar, i.e. "stamping-out" data flows.
  • 在 UI 中难以管理或者会导致验证问题的复杂表达式。Complex expressions that are difficult to manage in the UI or are resulting in validation issues.
  • 调试和更好地了解执行过程中返回的各种错误。Debugging and better understanding various errors returned during execution.

生成要与 PowerShell 或 API 结合使用的数据流脚本时,必须将格式化文本折叠为一行。When you build a data flow script to use with PowerShell or an API, you must collapse the formatted text into a single line. 可以将制表符和换行符作为转义字符。You can keep tabs and newlines as escape characters. 但必须对文本进行格式化,才能容纳到 JSON 属性中。But the text must be formatted to fit inside a JSON property. 脚本编辑器 UI 底部有一个按钮,该按钮可将脚本的格式设置为一行。There is a button on the script editor UI at the bottom that will format the script as a single line for you.

“复制”按钮Copy button

如何添加转换How to add transforms

添加转换需要三个基本步骤:添加核心转换数据、重新路由输入流,然后重新路由输出流。Adding transformations requires three basic steps: adding the core transformation data, rerouting the input stream, and then rerouting the output stream. 通过示例可轻松阐明这些步骤。This can be seen easiest in an example. 假设我们先通过一个简单的源来接收数据流,如下所示:Let's say we start with a simple source to sink data flow like the following:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

如果我们决定添加派生转换,首先需要创建核心转换文本,其中包含一个简单的表达式,用于添加名为 upperCaseTitle 的新大写列:If we decide to add a derive transformation, first we need to create the core transformation text, which has a simple expression to add a new uppercase column called upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

然后获取现有 DFS 并添加转换:Then, we take the existing DFS and add the transformation:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

现在,通过确定接下来要在其后进行新转换的转换(在本例中为 source1),并将流的名称复制到新转换来重新路由传入流:And now we reroute the incoming stream by identifying which transformation we want the new transformation to come after (in this case, source1) and copying the name of the stream to the new transformation:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

最后,确定要在此新转换后进行的转换,并将其输入流(在本例中为 sink1)替换为新转换的输出流名称:Finally we identify the transformation we want to come after this new transformation, and replace its input stream (in this case, sink1) with the output stream name of our new transformation:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

DFS 基础知识DFS fundamentals

DFS 由一系列连接的转换组成,其中包括源、接收器,以及各种其他转换(可添加新列、筛选数据、联接数据以及执行其他操作)。The DFS is composed of a series of connected transformations, including sources, sinks, and various others which can add new columns, filter data, join data, and much more. 通常情况下,脚本以一个或多个源开始,后跟许多转换,以一个或多个接收器结束。Usually, the script with start with one or more sources followed by many transformations and ending with one or more sinks.

所有源都具有相同的基本构造:Sources all have the same basic construction:

source(
  source properties
) ~> source_name

例如,具有三个列(movieId、title、genres)的简单源将为:For instance, a simple source with three columns (movieId, title, genres) would be:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

除源之外的所有转换都具有相同的基本构造:All transformations other than sources have the same basic construction:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

例如,采用列 (title) 并使用大写版本覆盖此列的简单派生转换将如下所示:For example, a simple derive transformation that takes a column (title) and overwrites it with an uppercase version would be as follows:

source1 derive(
  title = upper(title)
) ~> derive1

无架构的接收器将如下所示:And a sink with no schema would simply be:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

脚本代码片段Script snippets

脚本代码片段是可共享的数据流脚本代码,可用于在数据流之间共享。Script snippets are shareable code of Data Flow Script that you can use to share across data flows.

聚合汇总统计信息Aggregated summary stats

将聚合转换添加到名为“SummaryStats”的数据流中,然后在脚本中粘贴聚合函数的此代码,并替换现有的 SummaryStats。Add an Aggregate transformation to your data flow called "SummaryStats" and then paste in this code below for the aggregate function in your script, replacing the existing SummaryStats. 这将为数据配置文件摘要统计信息提供通用模式。This will provide a generic pattern for data profile summary statistics.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
        each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
        each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

你还可以使用下面的示例来计算数据中的唯一行数和非重复行数。You can also use the below sample to count the number of unique and the number of distinct rows in your data. 可以使用名为 ValueDistAgg 的聚合转换将以下示例粘贴到数据流中。The example below can be pasted into a data flow with Aggregate transformation called ValueDistAgg. 此示例使用名为“title”的列。This example uses a column called "title". 请确保将“title”替换为要用于获取值计数的数据中的字符串列。Be sure to replace "title" with the string column in your data that you wish to use to get value counts.

aggregate(groupBy(title),
    countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
        numofdistinct = countDistinct(title)) ~> UniqDist

包含聚合中的所有列Include all columns in an aggregate

这是一种通用聚合模式,展示如何在生成聚合时将其余的列保存在输出元数据中。This is a generic aggregate pattern that demonstrates how you can keep the remaining columns in your output metadata when you are building aggregates. 在此示例中,我们使用 first() 函数来选择名称不是“movie”的每列中的第一个值。In this case, we use the first() function to choose the first value in every column whose name is not "movie". 若要使用此函数,请创建名为 DistinctRows 的聚合转换,然后将其粘贴到现有 DistinctRows 聚合脚本顶部的脚本中。To use this, create an Aggregate transformation called DistinctRows and then paste this in your script over top of the existing DistinctRows aggregate script.

aggregate(groupBy(movie),
    each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

创建行哈希指纹Create row hash fingerprint

使用数据流脚本中的此代码创建一个名为 DWhash 的新派生列,该派生列可生成三列的 sha1 哈希。Use this code in your data flow script to create a new derived column called DWhash that produces a sha1 hash of three columns.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

还可以使用以下脚本,使用流中的所有列生成行哈希,而无需命名每一列:You can also use this script below to generate a row hash using all columns that are present in your stream, without needing to name each column:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg 等效项String_agg equivalent

此代码的行为类似于 T-SQL string_agg() 函数,并会将字符串值聚合到一个数组中。This code will act like the T-SQL string_agg() function and will aggregate string values into an array. 这样,你便可以将该数组强制转换为要与 SQL 目标一起使用的字符串。You can then cast that array into a string to use with SQL destinations.

source1 aggregate(groupBy(year),
    string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

更新、更新插入、插入、删除的次数Count number of updates, upserts, inserts, deletes

使用更改行转换时,可能需要计算由更改行策略导致的更新、更新插入、插入、删除的次数。When using an Alter Row transformation, you may want to count the number of updates, upserts, inserts, deletes that result from your Alter Row policies. 更改行之后添加聚合转换,然后将此数据流脚本粘贴到这些计数的聚合定义中。Add an Aggregate transformation after your alter row and paste this Data Flow Script into the aggregate definition for those counts.

aggregate(updates = countIf(isUpdate(), 1),
        inserts = countIf(isInsert(), 1),
        upserts = countIf(isUpsert(), 1),
        deletes = countIf(isDelete(),1)) ~> RowCount

使用所有列的非重复行Distinct row using all columns

此代码片段会向采用所有传入列的数据流添加一个新的聚合转换、生成用于分组以消除重复项的哈希,然后提供每个重复项的第一个匹配项作为输出。This snippet will add a new Aggregate transformation to your data flow which will take all incoming columns, generate a hash that is used for grouping to eliminate duplicates, then provide the first occurrence of each duplicate as output. 不需要显式命名列,系统会自动根据传入的数据流生成列。You do not need to explicitly name the columns, they will be automatically generated from your incoming data stream.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

检查所有列中是否存在 NULL 值Check for NULLs in all columns

这是一个代码片段,你可以将其粘贴到数据流中,通常可检查所有列中是否存在 NULL 值。This is a snippet that you can paste into your data flow to generically check all of your columns for NULL values. 此方法利用架构偏差来浏览所有行中的所有列,并使用有条件拆分将具有 NULL 值的行与不具有 NULL 值的行分隔开。This technique leverages schema drift to look through all columns in all rows and uses a Conditional Split to separate the rows with NULLs from the rows with no NULLs.

split(contains(array(columns()),isNull(#item)),
    disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

使用 select 自动映射架构偏差AutoMap schema drift with a select

如果需要从未知或动态的传入列集中加载现有的数据库架构,则必须在接收器转换中映射右侧的列。When you need to load an existing database schema from an unknown or dynamic set of incoming columns, you must map the right-side columns in the Sink transformation. 仅在加载现有表时才需要执行此操作。This is only needed when you are loading an existing table. 在接收器之前添加此代码片段,以创建自动映射列的 Select。Add this snippet before your Sink to create a Select that auto-maps your columns. 将接收器映射保留为自动映射。Leave your Sink mapping to auto-map.

select(mapColumn(
        each(match(true()))
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> automap

保留列数据类型Persist column data types

将此脚本添加到派生列定义中,以使用接收器将数据流中的列名称和数据类型存储到永久存储区中。Add this script inside a Derived Column definition to store the column names and data types from your data flow to a persistent store using a sink.

derive(each(match(type=='string'), $$ = 'string'),
    each(match(type=='integer'), $$ = 'integer'),
    each(match(type=='short'), $$ = 'short'),
    each(match(type=='complex'), $$ = 'complex'),
    each(match(type=='array'), $$ = 'array'),
    each(match(type=='float'), $$ = 'float'),
    each(match(type=='date'), $$ = 'date'),
    each(match(type=='timestamp'), $$ = 'timestamp'),
    each(match(type=='boolean'), $$ = 'boolean'),
    each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

向下填充Fill down

此部分介绍当你想要使用序列中之前的非 NULL 值替换 NULL 值时,如何在数据集中解决常见的“向下填充”问题。Here is how to implement the common "Fill Down" problem with data sets when you want to replace NULL values with the value from the previous non-NULL value in the sequence. 请注意,此操作可能会对性能造成负面影响,因为必须使用“虚拟”类别值在整个数据集中创建合成窗口。Note that this operation can have negative performance implications because you must create a synthetic window across your entire data set with a "dummy" category value. 此外,必须按值进行排序才能创建适当的数据序列以查找之前的非 NULL 值。Additionally, you must sort by a value to create the proper data sequence to find the previous non-NULL value. 下面的代码片段创建合成类别作为“虚拟”类别,并按代理键进行排序。This snippet below creates the synthetic category as "dummy" and sorts by a surrogate key. 可以删除代理键并使用你自己的数据特定排序键。You can remove the surrogate key and use your own data-specific sort key. 此代码片段假设你已添加名为 source1 的源转换This code snippet assumes you've already added a Source transformation called source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
    startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
    asc(sk, true),
    Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

后续步骤Next steps

通过数据流概述文章了解数据流Explore Data Flows by starting with the data flows overview article