使用映射数据流转换 delta lake 中的数据
适用于: Azure 数据工厂 Azure Synapse Analytics
提示
试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用!
如果对 Azure 数据工厂不熟悉,请参阅 Azure 数据工厂简介。
在本教程中,你将使用数据流画布来创建数据流,以便分析和转换 Azure Data Lake Storage (ADLS) Gen2 中的数据,并将其存储在 Delta Lake 中。
先决条件
- Azure 订阅。 如果没有 Azure 订阅,请在开始前创建一个试用 Azure 帐户。
- Azure 存储帐户。 将 ADLS 存储用作“源”和“接收器”数据存储 。 如果没有存储帐户,请参阅创建 Azure 存储帐户以获取创建步骤。
在本教程中,我们要转换的文件是 MoviesDB.csv,可在此处找到。 若要从 GitHub 检索该文件,请将内容复制到所选的文本编辑器,在本地保存为 .csv 文件。 若要将文件上传到存储帐户,请参阅使用 Azure 门户上传 Blob。 这些示例引用名为“sample-data”的容器。
创建数据工厂
在此步骤中,请先创建数据工厂,然后打开数据工厂 UX,在该数据工厂中创建一个管道。
打开 Microsoft Edge 或 Google Chrome。 目前,仅 Microsoft Edge 和 Google Chrome Web 浏览器支持数据工厂 UI。
在左侧菜单中,选择“创建资源”>“数据 + 分析”>“数据工厂”。
在“新建数据工厂”页面上,在“名称”下输入“ADFTutorialDataFactory” 。
选择要在其中创建数据工厂的 Azure 订阅。
对于“资源组”,请执行以下步骤之一:
a. 选择“使用现有资源组”,并从下拉列表选择现有的资源组。
b. 选择“新建”,并输入资源组的名称。
若要了解资源组,请参阅使用资源组管理 Azure 资源。
在“位置”下选择数据工厂所在的位置。 下拉列表中仅显示支持的位置。 数据工厂使用的数据存储(例如 Azure 存储和 SQL 数据库)和计算资源(例如 Azure HDInsight)可以位于其他区域。
选择“创建” 。
创建完成后,通知中心内会显示通知。 选择“转到资源”导航到“数据工厂”页。
选择“创作和监视”,在单独的选项卡中启动数据工厂 UI。
创建包含数据流活动的管道
在此步骤中,要创建一个包含数据流活动的管道。
在主页上,选择“协调”。
在管道的“常规”选项卡中,输入“DeltaLake”作为管道的名称。
在“活动”窗格中,展开“移动和转换”可折叠部分 。 将“数据流”活动从该窗格拖放到管道画布上。
在“添加数据流”弹出窗口中,选择“创建新数据流”,然后将数据流命名为“DeltaLake”。 完成操作后,请选择“完成”。
在管道画布的顶部栏中,将“数据流调试”滑块滑动到打开。 调试模式允许针对实时 Spark 群集进行转换逻辑的交互式测试。 数据流群集需要 5-7 分钟才能预热,如果用户计划进行数据流开发,建议先打开调试。 有关详细信息,请参阅调试模式。
在数据流画布中构建转换逻辑
在本教程中,要生成两个数据流。 第一个数据流是一个简单的源到接收器数据流,用来基于电影 CSV 文件生成新的 Delta Lake。 最后,创建流设计来更新 Delta Lake 中的数据。
教程目标
- 使用先决条件中的 MoviesCSV 数据集源,并通过其形成新的 Delta Lake。
- 构建相应的逻辑,用以将 1988 年电影的评级更新为“1”。
- 删除 1950 年的所有电影。
- 通过复制 1960 年的电影为 2021 年插入新电影。
从一个空白的数据流画布开始
选择数据流编辑器窗口顶部的源转换,然后在“源设置”窗口中选择“数据集”属性旁边的“+ 新建”:
从显示的“新数据集”窗口中选择 Azure Data Lake Storage Gen2,然后选择“继续”。
“数据集类型”选择 DelimitedText,然后再次选择“继续”。
将数据集命名为“MoviesCSV”,然后选择“链接服务”下的“+ 新建”,在文件中创建新的链接服务。
提供之前在“先决条件”部分创建的存储帐户的详细信息,然后浏览并选择在那里上传的 MoviesCSV 文件。
添加链接服务后,选中“第一行作为标题”复选框,然后选择“确定”以添加源。
导航到数据流设置窗口的“投影”选项卡,然后选择“检测数据类型”。
现在,在数据流编辑器窗口中选择“源”后的 +,向下滚动,在“目标”部分下选择“接收器”,向数据流添加新接收器。
在添加接收器后显示的接收器设置的“接收器”选项卡中,为“接收器类型”选择“内联”,然后为“内联数据集类型”选择“增量”。 然后,为“链接服务”选择 Azure Data Lake Storage Gen2。
在存储容器中选择你希望该服务在其中创建 Delta Lake 的文件夹名称。
最后,返回管道设计器,然后选择“调试”以在画布上仅有此数据流活动的情况在调试模式下执行管道。 这会在 Azure Data Lake Storage Gen2 中生成新的 Delta Lake。
现在,从屏幕左侧的“工厂资源”菜单中,选择 + 以添加新资源,然后选择“数据流”。
和前面一样,再次选择 MoviesCSV 文件作为源,然后从“投影”选项卡中再次选择“检测数据类型”。
这一次,在创建源后,在数据流编辑器窗口中选择 +,并在源中添加筛选器转换。
在“筛选设置”窗口中添加一个“筛选条件”,用于仅显示匹配 1950、1960 和 1988 年的电影行。
现在添加“派生列”转换,将每部 1988 电影的分级更新为“1”。
Update, insert, delete, and upsert
策略是在“更改行”转换中创建的。 在你的派生列后添加一个“更改行”转换。你的“更改行”策略应如下所示。
现在已为每个“更改行”类型设置了正确的策略,接下来检查是否已在接收器转换上设置了正确的更新规则
此处我们将 Delta Lake 接收器用于 Azure Data Lake Storage Gen2 数据湖,并允许插入、更新、删除。
请注意,“键列”是一个组合键,由“电影主键”列和“年份”列组成。 这是因为我们通过复制 1960 年的行创建了虚构的 2021 年电影。 此组合键提供了唯一性,从而避免了在查找现有行时出现冲突。
下载完整的示例
下面是一个用于 Delta 管道的示例解决方案,其中包含用于更新/删除 Lake 中的行的数据流。
相关内容
详细了解数据流表达式语言。