映射数据流中的聚合转换Aggregate transformation in mapping data flow

适用于: Azure 数据工厂 Azure Synapse Analytics

聚合转换可以定义数据流中列的聚合。The Aggregate transformation defines aggregations of columns in your data streams. 使用表达式生成器可以定义不同类型的聚合,如 SUM、MIN、MAX 和 COUNT,并按现有列或计算列进行分组。Using the Expression Builder, you can define different types of aggregations such as SUM, MIN, MAX, and COUNT grouped by existing or computed columns.

Group byGroup by

选择现有列或创建新的计算列,以用作聚合的 group by 子句。Select an existing column or create a new computed column to use as a group by clause for your aggregation. 若要使用现有列,请从下拉列表中选择它。To use an existing column, select it from the dropdown. 若要创建新的计算列,请将鼠标悬停在子句上,并单击“计算列”。To create a new computed column, hover over the clause and click Computed column. 这将打开数据流表达式生成器This opens the data flow expression builder. 创建计算列后,请在“命名为”字段下输入输出列名称。Once you create your computed column, enter the output column name under the Name as field. 如果要添加其他 group by 子句,请将鼠标悬停在现有子句上,然后单击加号图标。If you wish to add an additional group by clause, hover over an existing clause and click the plus icon.

聚合转换分组依据设置Aggregate transformation group by settings

在聚合转换中,group by 子句是可选的。A group by clause is optional in an Aggregate transformation.

聚合列Aggregate columns

转到“聚合”选项卡以生成聚合表达式。Go to the Aggregates tab to build aggregation expressions. 可以使用聚合覆盖现有列,也可以使用新名称创建新字段。You can either overwrite an existing column with an aggregation, or create a new field with a new name. 在列名选择器旁的右侧框中输入聚合表达式。The aggregation expression is entered in the right-hand box next to the column name selector. 若要编辑表达式,请单击文本框并打开表达式生成器。To edit the expression, click on the text box and open the expression builder. 若要添加更多聚合列,请单击列列表上方的“添加”或现有聚合列旁边的加号图标。To add more aggregate columns, click on Add above the column list or the plus icon next to an existing aggregate column. 选择“添加列”或“添加列模式”。Choose either Add column or Add column pattern. 每个聚合表达式必须至少包含一个聚合函数。Each aggregation expression must contain at least one aggregate function.

聚合设置Aggregate settings

备注

在“调试”模式下,表达式生成器无法使用聚合函数生成数据预览。In Debug mode, the expression builder cannot produce data previews with aggregate functions. 若要查看聚合转换的数据预览,请关闭表达式生成器,然后通过“数据预览”选项卡查看数据。To view data previews for aggregate transformations, close the expression builder and view the data via the 'Data Preview' tab.

列模式Column patterns

使用列模式将相同的聚合应用于一组列。Use column patterns to apply the same aggregation to a set of columns. 如果希望保留输入架构中的许多列(默认情况下会将其删除),则此功能很有用。This is useful if you wish to persist many columns from the input schema as they are dropped by default. 使用诸如 first() 之类的试探法来通过聚合保留输入列。Use a heuristic such as first() to persist input columns through the aggregation.

重新连接行和列Reconnect rows and columns

聚合转换类似于 SQL 聚合 select 查询。Aggregate transformations are similar to SQL aggregate select queries. 未包含在 group by 子句或聚合函数中的列不会流向聚合转换的输出。Columns that aren't included in your group by clause or aggregate functions won't flow through to the output of your aggregate transformation. 如果希望在聚合输出中包括其他列,请执行以下方法之一:If you wish to include other columns in your aggregated output, do one of the following methods:

  • 使用聚合函数(如 last()first())来包含该其他列。Use an aggregate function such as last() or first() to include that additional column.
  • 使用自联接模式将列重新联接到输出流。Rejoin the columns to your output stream using the self join pattern.

删除重复的行Removing duplicate rows

聚合转换的常见用途是删除或标识源数据中的重复条目。A common use of the aggregate transformation is removing or identifying duplicate entries in source data. 此过程称为重复数据删除。This process is known as deduplication. 基于一组分组依据键,使用你选择的试探法确定要保留的重复行。Based upon a set of group by keys, use a heuristic of your choosing to determine which duplicate row to keep. 常见试探法有 first()last()max()min()Common heuristics are first(), last(), max(), and min(). 使用列模式将规则应用到除分组依据列之外的每一列。Use column patterns to apply the rule to every column except for the group by columns.

重复数据删除Deduplication

在上面的示例中,列 ProductIDName 用于分组。In the above example, columns ProductID and Name are being use for grouping. 如果这两列中有两行的值相同,则将其视为重复项。If two rows have the same values for those two columns, they're considered duplicates. 在此聚合转换中,将保留匹配的第一行的值,并删除所有其他行的值。In this aggregate transformation, the values of the first row matched will be kept and all others will be dropped. 使用列模式语法,将名称不是 ProductIDName 的所有列都映射到它们的现有列名,并为其指定第一个匹配行的值。Using column pattern syntax, all columns whose names aren't ProductID and Name are mapped to their existing column name and given the value of the first matched rows. 输出架构与输入架构相同。The output schema is the same as the input schema.

对于数据验证方案,count() 函数可用于计算有多少重复项。For data validation scenarios, the count() function can be used to count how many duplicates there are.

数据流脚本Data flow script

语法Syntax

<incomingStream>
    aggregate(
           groupBy(
                <groupByColumnName> = <groupByExpression1>,
                <groupByExpression2>
               ),
           <aggregateColumn1> = <aggregateExpression1>,
           <aggregateColumn2> = <aggregateExpression2>,
           each(
                match(matchExpression),
                <metadataColumn1> = <metadataExpression1>,
                <metadataColumn2> = <metadataExpression2>
               )
          ) ~> <aggregateTransformationName>

示例Example

下面的示例采用传入流 MoviesYear 并按列 year 对行分组。The below example takes an incoming stream MoviesYear and groups rows by column year. 该转换将创建一个汇总列 avgrating,其计算结果为列 Rating 的平均值。The transformation creates an aggregate column avgrating that evaluates to the average of column Rating. 此聚合转换名为 AvgComedyRatingsByYearThis aggregate transformation is named AvgComedyRatingsByYear.

在数据工厂 UX 中,此转换如下图所示:In the Data Factory UX, this transformation looks like the below image:

按示例分组Group by example

聚合示例Aggregate example

此转换的数据流脚本位于下面的代码片段中。The data flow script for this transformation is in the snippet below.

MoviesYear aggregate(
                groupBy(year),
                avgrating = avg(toInteger(Rating))
            ) ~> AvgComedyRatingByYear

聚合数据流脚本Aggregate data flow script

MoviesYear:用于定义年份和标题列的派生列 AvgComedyRatingByYear:按年份分组的喜剧的平均评分的聚合转换 avgrating:将为保存聚合值而创建的新列的名称MoviesYear: Derived Column defining year and title columns AvgComedyRatingByYear: Aggregate transformation for average rating of comedies grouped by year avgrating: Name of new column being created to hold the aggregated value

MoviesYear aggregate(groupBy(year),
    avgrating = avg(toInteger(Rating))) ~> AvgComedyRatingByYear

后续步骤Next steps