使用映射数据流转换数据Transform data using mapping data flows

适用于: Azure 数据工厂 Azure Synapse Analytics

如果对 Azure 数据工厂不熟悉,请参阅 Azure 数据工厂简介If you're new to Azure Data Factory, see Introduction to Azure Data Factory.

在本教程中,你将使用 Azure 数据工厂用户界面 (UX) 创建一个管道,该管道使用映射数据流将数据从 Azure Data Lake Storage (ADLS) Gen2 源复制到 ADLS Gen2 接收器并对其进行转换。In this tutorial, you'll use the Azure Data Factory user interface (UX) to create a pipeline that copies and transforms data from an Azure Data Lake Storage (ADLS) Gen2 source to an ADLS Gen2 sink using mapping data flow. 使用映射数据流转换数据时,可以扩展本教程中的配置模式The configuration pattern in this tutorial can be expanded upon when transforming data using mapping data flow

在本教程中,将执行以下步骤:In this tutorial, you do the following steps:

  • 创建数据工厂。Create a data factory.
  • 创建包含数据流活动的管道。Create a pipeline with a Data Flow activity.
  • 构建具有四个转换的映射数据流。Build a mapping data flow with four transformations.
  • 测试性运行管道。Test run the pipeline.
  • 监视数据流活动Monitor a Data Flow activity

先决条件Prerequisites

  • Azure 订阅Azure subscription. 如果没有 Azure 订阅,请在开始前创建一个试用 Azure 帐户If you don't have an Azure subscription, create a trial Azure account before you begin.
  • Azure 存储帐户Azure storage account. 将 ADLS 存储用作“源”和“接收器”数据存储 。You use ADLS storage as a source and sink data stores. 如果没有存储帐户,请参阅创建 Azure 存储帐户以获取创建步骤。If you don't have a storage account, see Create an Azure storage account for steps to create one.

在本教程中,我们要转换的文件是 MoviesDB.csv,可在此处找到。The file that we are transforming in this tutorial is MoviesDB.csv, which can be found here. 若要从 GitHub 检索该文件,请将内容复制到所选的文本编辑器,在本地保存为 .csv 文件。To retrieve the file from GitHub, copy the contents to a text editor of your choice to save locally as a .csv file. 若要将文件上传到存储帐户,请参阅使用 Azure 门户上传 BlobTo upload the file to your storage account, see Upload blobs with the Azure portal. 这些示例将引用名为“sample-data”的容器。The examples will be referencing a container named 'sample-data'.

创建数据工厂Create a data factory

在此步骤中,请先创建数据工厂,然后打开数据工厂 UX,在该数据工厂中创建一个管道。In this step, you create a data factory and open the Data Factory UX to create a pipeline in the data factory.

  1. 打开 Microsoft EdgeGoogle ChromeOpen Microsoft Edge or Google Chrome. 目前,仅 Microsoft Edge 和 Google Chrome Web 浏览器支持数据工厂 UI。Currently, Data Factory UI is supported only in the Microsoft Edge and Google Chrome web browsers.

  2. 在左侧菜单中,选择“创建资源” > “数据 + 分析” > “数据工厂”:On the left menu, select Create a resource > Data + Analytics > Data Factory:

    在“新建”窗格中选择“数据工厂”

  3. 在“新建数据工厂”页的“名称”下输入 ADFTutorialDataFactoryOn the New data factory page, under Name, enter ADFTutorialDataFactory.

    Azure 数据工厂的名称必须 全局唯一The name of the Azure data factory must be globally unique. 如果收到有关名称值的错误消息,请为数据工厂输入另一名称。If you receive an error message about the name value, enter a different name for the data factory. (例如 yournameADFTutorialDataFactory)。(for example, yournameADFTutorialDataFactory). 有关数据工厂项目的命名规则,请参阅数据工厂命名规则For naming rules for Data Factory artifacts, see Data Factory naming rules.

    新建数据工厂

  4. 选择要在其中创建数据工厂的 Azure 订阅Select the Azure subscription in which you want to create the data factory.

  5. 对于“资源组”,请执行以下步骤之一:For Resource Group, take one of the following steps:

    a.a. 选择“使用现有资源组”,并从下拉列表选择现有的资源组。 Select Use existing, and select an existing resource group from the drop-down list.

    b.b. 选择“新建”,并输入资源组的名称。Select Create new, and enter the name of a resource group.

    若要了解资源组,请参阅使用资源组管理 Azure 资源To learn about resource groups, see Use resource groups to manage your Azure resources.

  6. 在“位置”下选择数据工厂所在的位置。Under Location, select a location for the data factory. 下拉列表中仅显示支持的位置。Only locations that are supported are displayed in the drop-down list. 数据工厂使用的数据存储(例如 Azure 存储和 SQL 数据库)和计算资源(例如 Azure HDInsight)可以位于其他区域。Data stores (for example, Azure Storage and SQL Database) and computes (for example, Azure HDInsight) used by the data factory can be in other regions.

  7. 选择“创建” 。Select Create.

  8. 创建完成后,通知中心内会显示通知。After the creation is finished, you see the notice in Notifications center. 选择“转到资源”导航到“数据工厂”页。Select Go to resource to navigate to the Data factory page.

  9. 选择“创作和监视”,在单独的选项卡中启动数据工厂 UI。Select Author & Monitor to launch the Data Factory UI in a separate tab.

创建包含数据流活动的管道Create a pipeline with a Data Flow activity

在此步骤中,你将创建一个包含数据流活动的管道。In this step, you'll create a pipeline that contains a Data Flow activity.

  1. 在“开始使用”页中,选择“创建管道”。 On the Let's get started page, select Create pipeline.

    创建管道

  2. 在管道的“常规”选项卡中,输入 TransformMovies 作为管道的名称 。In the General tab for the pipeline, enter TransformMovies for Name of the pipeline.

  3. 在工厂顶部栏中,将“数据流调试”滑块滑动到打开。In the factory top bar, slide the Data Flow debug slider on. 调试模式允许针对实时 Spark 群集进行转换逻辑的交互式测试。Debug mode allows for interactive testing of transformation logic against a live Spark cluster. 数据流群集需要 5-7 分钟才能预热,如果用户计划进行数据流开发,建议先打开调试。Data Flow clusters take 5-7 minutes to warm up and users are recommended to turn on debug first if they plan to do Data Flow development. 有关详细信息,请参阅调试模式For more information, see Debug Mode.

    数据流活动

  4. 在“活动”窗格中,展开“移动和转换”可折叠部分 。In the Activities pane, expand the Move and Transform accordion. 将“数据流”活动从该窗格拖放到管道画布上。Drag and drop the Data Flow activity from the pane to the pipeline canvas.

    显示管道画布的屏幕截图,你可以在其中放置数据流活动。

  5. 在“添加数据流”弹出窗口中,选择“创建新数据流”,然后将数据流命名为 TransformMovies 。In the Adding Data Flow pop-up, select Create new Data Flow and then name your data flow TransformMovies. 完成操作后,请单击“完成”。Click Finish when done.

    显示在创建新数据流时对数据流进行命名的位置的屏幕截图。

在数据流画布中构建转换逻辑Build transformation logic in the data flow canvas

创建数据流后,会自动将其发送到数据流画布。Once you create your Data Flow, you'll be automatically sent to the data flow canvas. 在此步骤中,你将构建一个数据流,该数据流采用 ADLS 存储中的 moviesDB.csv,并聚合从 1910 到 2000 年的平均喜剧评分。In this step, you'll build a data flow that takes the moviesDB.csv in ADLS storage and aggregates the average rating of comedies from 1910 to 2000. 然后,将此文件写回到 ADLS 存储。You'll then write this file back to the ADLS storage.

  1. 在数据流画布中,通过单击“添加源”框来添加源。In the data flow canvas, add a source by clicking on the Add Source box.

    显示“添加源”框的屏幕截图。

  2. 将源命名为 MoviesDB。Name your source MoviesDB. 单击“新建”以创建新的源数据集。Click on New to create a new source dataset.

    显示在命名源后选择“新建”的位置的屏幕截图。

  3. 选择 Azure Data Lake Storage Gen2。Choose Azure Data Lake Storage Gen2. 单击“继续”(Continue)。Click Continue.

    显示 Azure Data Lake Storage Gen2 磁贴的屏幕截图。

  4. 选择 DelimitedText。Choose DelimitedText. 单击“继续”(Continue)。Click Continue.

    显示 DelimitedText 磁贴的屏幕截图。

  5. 将数据集命名为 MoviesDB。Name your dataset MoviesDB. 在链接服务下拉列表中,选择“新建”。In the linked service dropdown, choose New.

    显示“链接服务”下拉列表的屏幕截图。

  6. 在链接服务创建屏幕中,将 ADLS Gen2 链接服务命名为 ADLSGen2,并指定身份验证方法。In the linked service creation screen, name your ADLS gen2 linked service ADLSGen2 and specify your authentication method. 然后输入连接凭据。Then enter your connection credentials. 在本教程中,我们将使用帐户密钥连接到存储帐户。In this tutorial, we're using Account key to connect to our storage account. 可以单击“测试连接”以验证是否已正确输入凭据。You can click Test connection to verify your credentials were entered correctly. 完成后,单击“创建”。Click Create when finished.

    链接服务

  7. 返回数据集创建屏幕后,请在“文件路径”字段下输入文件所在的位置。Once you're back at the dataset creation screen, enter where your file is located under the File path field. 在本教程中,文件 moviesDB.csv 位于容器 sample-data 中。In this tutorial, the file moviesDB.csv is located in container sample-data. 由于文件具有标头,请选择“第一行作为标头”。As the file has headers, check First row as header. 选择“从连接/存储”,以直接从存储中的文件导入标头架构。Select From connection/store to import the header schema directly from the file in storage. 完成后单击“确定”。Click OK when done.

    数据集

  8. 如果调试群集已启动,请转到源转换的“数据预览”选项卡,然后单击“刷新”以获取数据的快照 。If your debug cluster has started, go to the Data Preview tab of the source transformation and click Refresh to get a snapshot of the data. 可以使用数据预览来验证是否已正确配置转换。You can use data preview to verify your transformation is configured correctly.

    显示你可以在何处预览数据以验证是否已正确配置转换的屏幕截图。

  9. 在数据流画布上的源节点旁边,单击加号图标以添加新转换。Next to your source node on the data flow canvas, click on the plus icon to add a new transformation. 要添加的第一个转换是“筛选器”。The first transformation you're adding is a Filter.

    数据流画布

  10. 将筛选器转换命名为 FilterYears。Name your filter transformation FilterYears. 单击“筛选依据”旁的表达式框以打开表达式生成器。Click on the expression box next to Filter on to open the expression builder. 可在此处指定筛选条件。Here you'll specify your filtering condition.

    显示“筛选依据”表达式框的屏幕截图。

  11. 数据流表达式生成器允许你以交互方式生成要用于各种转换的表达式。The data flow expression builder lets you interactively build expressions to use in various transformations. 表达式可以包含内置函数、输入架构中的列和用户定义的参数。Expressions can include built-in functions, columns from the input schema, and user-defined parameters. 有关如何生成表达式的详细信息,请参阅数据流表达式生成器For more information on how to build expressions, see Data Flow expression builder.

    在本教程中,你要筛选在 1910 到 2000 年之间上映的喜剧流派电影。In this tutorial, you want to filter movies of genre comedy that came out between the years 1910 and 2000. 由于年份目前是一个字符串,因此需要使用 toInteger() 函数将其转换为整数。As year is currently a string, you need to convert it to an integer using the toInteger() function. 使用大于或等于 (>=) 和小于或等于 (<=) 运算符来与文本年份值 1910 和 2000 进行比较。Use the greater than or equals to (>=) and less than or equals to (<=) operators to compare against literal year values 1910 and 200-. 将这些表达式与 and (&&) 运算符结合在一起。Union these expressions together with the and (&&) operator. 表达式如下所示:The expression comes out as:

    toInteger(year) >= 1910 && toInteger(year) <= 2000

    若要找出哪些电影属于喜剧,可以使用 rlike() 函数查找 genres 列中的“Comedy”。To find which movies are comedies, you can use the rlike() function to find pattern 'Comedy' in the column genres. 将 rlike 表达式与年份比较结合得到:Union the rlike expression with the year comparison to get:

    toInteger(year) >= 1910 && toInteger(year) <= 2000 && rlike(genres, 'Comedy')

    如果调试群集处于活动状态,则可以通过单击“刷新”来查看表达式输出与所使用的输入之间的比较,以验证逻辑。If you've a debug cluster active, you can verify your logic by clicking Refresh to see expression output compared to the inputs used. 关于如何使用数据流表达式语言实现这个逻辑,有多个正确答案。There's more than one right answer on how you can accomplish this logic using the data flow expression language.

    筛选器

    在完成表达式后,单击“保存并完成”。Click Save and Finish once you're done with your expression.

  12. 提取“数据预览”以验证筛选器是否正常工作。Fetch a Data Preview to verify the filter is working correctly.

    显示你提取的数据预览的屏幕截图。

  13. 要添加的下一个转换是“架构修饰符”下的“聚合”转换 。The next transformation you'll add is an Aggregate transformation under Schema modifier.

    显示聚合架构修饰符的屏幕截图。

  14. 将聚合转换命名为 AggregateComedyRatings。Name your aggregate transformation AggregateComedyRatings. 在“分组依据”选项卡中,从下拉列表中选择“年份”,按电影上映的年份对聚合进行分组 。In the Group by tab, select year from the dropdown to group the aggregations by the year the movie came out.

    显示聚合设置下“分组依据”选项卡中年份选项的屏幕截图。

  15. 转到“聚合”选项卡。在左侧文本框中,将聚合列命名为 AverageComedyRating。Go to the Aggregates tab. In the left text box, name the aggregate column AverageComedyRating. 单击右侧的表达式框,通过表达式生成器输入聚合表达式。Click on the right expression box to enter the aggregate expression via the expression builder.

    显示聚合设置下“聚合”选项卡中年份选项的屏幕截图。

  16. 若要获取 Rating 列的平均值,请使用 avg() 聚合函数。To get the average of column Rating, use the avg() aggregate function. 由于 Rating 为字符串并且 avg() 采用数字输入,因此必须通过 toInteger() 函数将该值转换为数字。As Rating is a string and avg() takes in a numerical input, we must convert the value to a number via the toInteger() function. 该表达式如下所示:This is expression looks like:

    avg(toInteger(Rating))

    完成后,单击“保存并完成”Click Save and Finish when done.

    显示保存的表达式的屏幕截图。

  17. 转到“数据预览”选项卡以查看转换输出。Go to the Data Preview tab to view the transformation output. 请注意,这里只有两个列,year 和 AverageComedyRating 。Notice only two columns are there, year and AverageComedyRating.

    聚合

  18. 接下来,你需要在“目标”下添加“接收器”转换 。Next, you want to add a Sink transformation under Destination.

    显示在目标下添加接收器转换的位置的屏幕截图。

  19. 将接收器命名为 Sink。Name your sink Sink. 单击“新建”以创建接收器数据集。Click New to create your sink dataset.

    显示你可以在何处命名接收器并创建新接收器数据集的屏幕截图。

  20. 选择 Azure Data Lake Storage Gen2。Choose Azure Data Lake Storage Gen2. 单击“继续”(Continue)。Click Continue.

    显示你可以选择的 Azure Data Lake Storage Gen2 磁贴的屏幕截图。

  21. 选择 DelimitedText。Choose DelimitedText. 单击“继续”(Continue)。Click Continue.

    数据集

  22. 将接收器数据集命名为 MoviesSink。Name your sink dataset MoviesSink. 对于链接服务,请选择在步骤 6 中创建的 ADLS gen2 链接服务。For linked service, choose the ADLS gen2 linked service you created in step 6. 输入要向其写入数据的输出文件夹。Enter an output folder to write your data to. 在本教程中,我们将写入容器“sample-data”中的文件夹“output”。In this tutorial, we're writing to folder 'output' in container 'sample-data'. 该文件夹不需要事先存在,可以动态创建。The folder doesn't need to exist beforehand and can be dynamically created. 将“第一行作为标头”设置为 true,并为“导入架构”选择“无” 。Set First row as header as true and select None for Import schema. 单击“完成”。Click Finish.

    接收器

现在,你已经完成了数据流的构建。Now you've finished building your data flow. 你已准备好在管道中运行它。You're ready to run it in your pipeline.

运行和监视数据流Running and monitoring the Data Flow

可以在发布管道之前对其进行调试。You can debug a pipeline before you publish it. 在此步骤中,将触发数据流管道的调试运行。In this step, you're going to trigger a debug run of the data flow pipeline. 当数据预览不写入数据时,调试运行会将数据写入接收器目标。While data preview doesn't write data, a debug run will write data to your sink destination.

  1. 转到管道画布。Go to the pipeline canvas. 单击“调试”以触发调试运行。Click Debug to trigger a debug run.

    显示管道画布的屏幕截图,起哄突出显示了“调试”。

  2. 数据流活动的管道调试使用活动调试群集,但仍至少需要一分钟的时间来初始化。Pipeline debug of Data Flow activities uses the active debug cluster but still take at least a minute to initialize. 可以通过“输出”选项卡跟踪进度。运行成功后,单击眼镜图标打开监视窗格。You can track the progress via the Output tab. Once the run is successful, click on the eyeglasses icon to open the monitoring pane.

    管道

  3. 在监视窗格中,你可以看到每个转换步骤中的行数和所用时间。In the monitoring pane, you can see the number of rows and time spent in each transformation step.

    显示监视窗格的屏幕截图,可在其中看到每个转换步骤中的行数和所用时间。

  4. 单击转换可获取有关列和数据分区的详细信息。Click on a transformation to get detailed information about the columns and partitioning of the data.

    监视

如果已正确遵循本教程,则应在接收器文件夹中写入 83 个行和 2 个列。If you followed this tutorial correctly, you should have written 83 rows and 2 columns into your sink folder. 可以通过检查 Blob 存储来验证数据是否正确。You can verify the data is correct by checking your blob storage.

后续步骤Next steps

本教程中的管道运行一个数据流,该数据流聚合从 1910 到 2000 年的平均喜剧评分,并将数据写入 ADLS。The pipeline in this tutorial runs a data flow that aggregates the average rating of comedies from 1910 to 2000 and writes the data to ADLS. 你已了解如何执行以下操作:You learned how to:

  • 创建数据工厂。Create a data factory.
  • 创建包含数据流活动的管道。Create a pipeline with a Data Flow activity.
  • 构建具有四个转换的映射数据流。Build a mapping data flow with four transformations.
  • 测试性运行管道。Test run the pipeline.
  • 监视数据流活动Monitor a Data Flow activity

详细了解数据流表达式语言Learn more about the data flow expression language.