本指南介绍如何在Azure SQL Database中采用现有的规范化数据库架构,并将其转换为Azure Cosmos DB非规范化架构,以便加载到Azure Cosmos DB。
SQL 架构通常使用第三种普通窗体进行建模,从而生成规范化架构,这些架构提供高度的数据完整性和较少的重复数据值。 查询可跨表将实体联接在一起以进行读取。 Azure Cosmos DB针对超快速事务进行优化,并通过文档内自包含数据的非规范化架构在集合或容器中进行查询。
使用 Azure Data Factory,我们将创建一个管道,该管道使用单个映射数据流从两个包含主键和外键关系的 Azure SQL 数据库规范化表中读取。 数据工厂将使用数据流 Spark 引擎将这些表联接到单个流中,将联接的行收集到数组中,并生成单独的清理文档以插入到新的Azure Cosmos DB容器中。
本指南动态创建一个名为“orders”的新容器,该容器将使用标准 SQL Server Adventure Works 示例数据库中的 SalesOrderHeader 和 SalesOrderDetail 表。 这些表表示由 SalesOrderID 联接的销售交易。 每个唯一的详细信息记录都具有自己的主键 SalesOrderDetailID。 标头与详细信息之间的关系是 1:M。 我们在 ADF 中连接 SalesOrderID,然后将每个相关的详细记录汇总到一个名为“detail”的数组中。
本指南的典型 SQL 查询是:
SELECT
o.SalesOrderID,
o.OrderDate,
o.Status,
o.ShipDate,
o.SalesOrderNumber,
o.ShipMethod,
o.SubTotal,
(select SalesOrderDetailID, UnitPrice, OrderQty from SalesLT.SalesOrderDetail od where od.SalesOrderID = o.SalesOrderID for json auto) as OrderDetails
FROM SalesLT.SalesOrderHeader o;
生成的Azure Cosmos DB容器将内部查询嵌入到单个文档中,如下所示:
创建管道
选择“+新建管道”以创建新管道。
添加数据流活动
在数据流活动中,选择“新建映射数据流”。
构造此数据流图:
定义“SourceOrderDetails”的源。 对于数据集,请创建一个新的Azure SQL Database数据集,该数据集指向
SalesOrderDetail表。定义“SourceOrderHeader”的源。 对于数据集,请创建一个新的Azure SQL Database数据集,该数据集指向
SalesOrderHeader表。在源文件顶部,在“SourceOrderDetails”之后添加派生列转换。 将新转换称为“TypeCast”。 我们需要将
UnitPrice列舍入,并将其转换为Azure Cosmos DB的双数据类型。 将公式设置为:toDouble(round(UnitPrice,2))。添加另一个派生列并将其称为“MakeStruct”。 我们在此处创建一个层次结构来保存详细信息表中的值。 请记住,详细信息与标头有
M:1关系。 将新结构命名为orderdetailsstruct并以这种方式创建层次结构,将每个子列设置为传入列名:
接下来转到销售标头源。 添加联接转换。 对于右侧,选择“MakeStruct”。 将其设置为“内联”,然后在联接条件两侧均选择
SalesOrderID。在你添加的新联接中选择“数据预览”选项卡,这样便可查看目前结果。 应该会看到所有的标头行都与详细信息行结合显示。 这是由
SalesOrderID形成的连接结果。 接下来,我们将公共行中的详细信息合并到详细信息结构体中,并汇总公共行。
在创建数组来反规范这些行之前,首先需要删除不需要的列,并确保数据值与数据类型Azure Cosmos DB匹配。
接下来添加一个“选择转换”,并按如下所示设置字段映射:
现在,我们再次转换货币列,这次选择
TotalDue。 与上面的步骤 7 类似,将公式设置为:toDouble(round(TotalDue,2))。在这里,我们通过公共键
SalesOrderID分组来使行非规范化。 添加聚合转换,并将“分组依据”设置为SalesOrderID。在聚合公式中,添加一个名为“details”的新列,并使用此公式来收集之前创建的
orderdetailsstruct结构中的值:collect(orderdetailsstruct)。聚合转换将仅输出属于聚合或分组公式的列。 所以,我们还需要包含销售表头中的列。 为此,请在该聚合转换中添加一个列模式。 此模式包括输出中的所有其他列,不包括下面列出的列(OrderQty、UnitPrice、SalesOrderID):
instr(name,'OrderQty')==0&&instr(name,'UnitPrice')==0&&instr(name,'SalesOrderID')==0
在其他属性中使用“this”语法 ($$),以便我们维护相同的列名并使用
first()函数作为聚合。 这会告诉 ADF 保留找到的第一个匹配值:
现在可通过添加接收器转换来完成迁移流。 选择在数据集旁边的“新建”,然后添加一个指向您的 Azure Cosmos DB 数据库的 Azure Cosmos DB 数据集。 对于该集合,我们称其为“orders”,它不包含任何架构和文档,因为它将会动态创建。
在“汇入设置”中,将分区键设置为
/SalesOrderID并将集操作设置为“重新创建”。 请确保“映射”选项卡如下所示:
选择“数据预览”确保你看到这32行会作为新文档插入到你的新容器中。
如果一切正常,你现在可以创建新的管道,将此数据流活动添加到该管道并执行它。 可以从调试模式或触发的运行状态开始执行。 几分钟后,Azure Cosmos DB数据库中应有一个名为“orders”的新非规范化订单容器。
相关内容
- 使用映射数据流转换操作构建剩余的数据流逻辑。
- 下载本教程中已完成的管道模板,并将该模板导入到工厂。