将规范化数据库架构从 Azure SQL 数据库迁移到 Azure CosmosDB 非规范化容器Migrate normalized database schema from Azure SQL Database to Azure CosmosDB denormalized container

本指南将介绍如何在 Azure SQL 数据库中获取现有的规范化数据库架构,并将其转换为 Azure CosmosDB 非规范化架构以加载到 Azure CosmosDB。This guide will explain how to take an existing normalized database schema in Azure SQL Database and convert it into an Azure CosmosDB denormalized schema for loading into Azure CosmosDB.

SQL 架构通常使用第三种普通窗体进行建模,从而生成规范化架构,这些架构提供高度的数据完整性和较少的重复数据值。SQL schemas are typically modeled using third normal form, resulting in normalized schemas that provide high levels of data integrity and fewer duplicate data values. 查询可跨表将实体联接在一起以进行读取。Queries can join entities together across tables for reading. CosmosDB 经过优化,可实现超快速事物并在集合或容器内通过非规范化架构对文档中自包含的数据进行查询。CosmosDB is optimized for super-quick transactions and querying within a collection or container via denormalized schemas with data self-contained inside a document.

借助 Azure 数据工厂,我们将构建一个使用单个映射数据流从两个 Azure SQL 数据库规范化表中读取数据的管道,这两个表包含主键和外键作为实体关系。Using Azure Data Factory, we'll build a pipeline that uses a single Mapping Data Flow to read from two Azure SQL Database normalized tables that contain primary and foreign keys as the entity relationship. ADF 将使用数据流 Spark 引擎将这些表加入单个流,将联接的行收集到数组中,并生成单独的已清理文档以插入新的 Azure CosmosDB 容器中。ADF will join those tables into a single stream using the data flow Spark engine, collect joined rows into arrays and produce individual cleansed documents for insert into a new Azure CosmosDB container.

本指南将动态构建一个名为“orders”的新容器,该容器将使用标准 SQL Server AdventureWorks 示例数据库中的 SalesOrderHeaderSalesOrderDetail 表。This guide will build a new container on the fly called "orders" that will use the SalesOrderHeader and SalesOrderDetail tables from the standard SQL Server AdventureWorks sample database. 这些表表示由 SalesOrderID 联接的销售事务。Those tables represent sales transactions joined by SalesOrderID. 每个唯一的详细信息记录都具有自己的主键 SalesOrderDetailIDEach unique detail records has its own primary key of SalesOrderDetailID. 标头与详细信息之间的关系是 1:MThe relationship between header and detail is 1:M. 我们将通过 ADF 联接 SalesOrderID,然后将每个相关的详细记录滚动到名为“detail”的数组中。We'll join on SalesOrderID in ADF and then roll each related detail record into an array called "detail".

本指南的典型 SQL 查询是:The representative SQL query for this guide is:

  SELECT
  o.SalesOrderID,
  o.OrderDate,
  o.Status,
  o.ShipDate,
  o.SalesOrderNumber,
  o.ShipMethod,
  o.SubTotal,
  (select SalesOrderDetailID, UnitPrice, OrderQty from SalesLT.SalesOrderDetail od where od.SalesOrderID = o.SalesOrderID for json auto) as OrderDetails
FROM SalesLT.SalesOrderHeader o;

生成的 CosmosDB 容器将内部查询嵌入到单个文档中,如下所示:The resulting CosmosDB container will embed the inner query into a single document and look like this:

集合

创建管道Create a pipeline

  1. 选择“+新建管道”以创建新管道。Select +New Pipeline to create a new pipeline.

  2. 添加数据流活动Add a data flow activity

  3. 在数据流活动中,选择“新建映射数据流”。In the data flow activity, select New mapping data flow.

  4. 我们将在下面构造此数据流图We will construct this data flow graph below

数据流图

  1. 定义“SourceOrderDetails”的源。Define the source for "SourceOrderDetails". 对于数据集,请新建一个指向 SalesOrderDetail 表的 Azure SQL 数据库数据集。For dataset, create a new Azure SQL Database dataset that points to the SalesOrderDetail table.

  2. 定义“SourceOrderHeader”的源。Define the source for "SourceOrderHeader". 对于数据集,请新建一个指向 SalesOrderHeader 表的 Azure SQL 数据库数据集。For dataset, create a new Azure SQL Database dataset that points to the SalesOrderHeader table.

  3. 在顶部的源中,在“SourceOrderDetails”之后添加派生列转换。On the top source, add a Derived Column transformation after "SourceOrderDetails". 调用新转换“TypeCast”。Call the new transformation "TypeCast". 需要将 UnitPrice 列舍入,并将其强制转换为 CosmosDB 的 double 数据类型。We need to round the UnitPrice column and cast it to a double data type for CosmosDB. 将公式设置为:toDouble(round(UnitPrice,2))Set the formula to: toDouble(round(UnitPrice,2)).

  4. 添加另一个派生列并将其称为“MakeStruct”。Add another derived column and call it "MakeStruct". 我们将在此处创建一个层次结构来保存详细信息表中的值。This is where we will create a hierarchical structure to hold the values from the details table. 请记住,详细信息与标头的关系是 M:1Remember, details is a M:1 relation to header. 将新结构命名为 orderdetailsstruct 并以这种方式创建层次结构,将每个子列设置为传入列名:Name the new structure orderdetailsstruct and create the hierarchy in this way, setting each subcolumn to the incoming column name:

创建结构

  1. 接下来转到销售标头源。Now, let's go to the sales header source. 添加联接转换。Add a Join transformation. 对于右侧,选择“MakeStruct”。For the right-side select "MakeStruct". 将其设置为“内联”,然后在联接条件两侧均选择 SalesOrderIDLeave it set to inner join and choose SalesOrderID for both sides of the join condition.

  2. 在你添加的新联接中单击“数据预览”选项卡,这样便可查看目前结果。Click on the Data Preview tab in the new join that you added so that you can see your results up to this point. 应该会看到所有与详细信息行联接的标头行。You should see all of the header rows joined with the detail rows. 这是从 SalesOrderID 形成的联接结果。This is the result of the join being formed from the SalesOrderID. 接下来,我们将公共行中的详细信息合并到详细信息结构中并聚合公共行。Next, we'll combine the details from the common rows into the details struct and aggregate the common rows.

联接

  1. 首先需要删除不需要的列并确保数据值与 CosmosDB 数据类型匹配,然后才能创建数组来使这些行非规范化。Before we can create the arrays to denormalize these rows, we first need to remove unwanted columns and make sure the data values will match CosmosDB data types.

  2. 接下来添加一个选择转换,并按如下所示设置字段映射:Add a Select transformation next and set the field mapping to look like this:

列清理器

  1. 接下来,再次强制转换货币列,这一次转换为 TotalDueNow let's again cast a currency column, this time TotalDue. 与上面的步骤 7 类似,将公式设置为:toDouble(round(TotalDue,2))Like we did above in step 7, set the formula to: toDouble(round(TotalDue,2)).

  2. 在这里,我们将按常用键 SalesOrderID 分组来使行非规范化。Here's where we will denormalize the rows by grouping by the common key SalesOrderID. 添加聚合转换,并将“分组依据”设置为 SalesOrderIDAdd an Aggregate transformation and set the group by to SalesOrderID.

  3. 在聚合公式中,添加一个名为“details”的新列,并使用此公式来收集之前创建的 orderdetailsstruct 结构中的值:collect(orderdetailsstruct)In the aggregate formula, add a new column called "details" and use this formula to collect the values in the structure that we created earlier called orderdetailsstruct: collect(orderdetailsstruct).

  4. 聚合转换将仅输出属于聚合或分组依据公式的列。The aggregate transformation will only output columns that are part of aggregate or group by formulas. 因从,我们还需要包含销售标头中的列。So, we need to include the columns from the sales header as well. 为此,请在同一聚合转换中添加列模式。To do that, add a column pattern in that same aggregate transformation. 此模式将包含输出中的所有其他列:This pattern will include all other columns in the output:

instr(name,'OrderQty')==0&&instr(name,'UnitPrice')==0&&instr(name,'SalesOrderID')==0

  1. 在其他属性中使用“this”语法,以便我们维护相同的列名并使用 first() 函数作为聚合:Use the "this" syntax in the other properties so that we maintain the same column names and use the first() function as an aggregate:

聚合

  1. 现在可通过添加接收器转换来完成迁移流。We're ready to finish the migration flow by adding a sink transformation. 单击数据集旁边的“新建”,然后添加指向 CosmosDB 数据库的 CosmosDB 数据集。Click "new" next to dataset and add a CosmosDB dataset that points to your CosmosDB database. 对于该集合,我们将其称为“orders”,因为它将会动态创建,因此不包含任何架构和文档。For the collection, we'll call it "orders" and it will have no schema and no documents because it will be created on the fly.

  2. 在“接收器设置”中,将分区键设置为 \SalesOrderID 并将集合操作设置为“重新创建”。In Sink Settings, Partition Key to \SalesOrderID and collection action to "recreate". 请确保“映射”选项卡如下所示:Make sure your mapping tab looks like this:

屏幕截图显示了“映射”选项卡。

  1. 单击“数据预览”以确保你将看到以下 32 行设置为以新文档形式插入新容器中:Click on data preview to make sure that you are seeing these 32 rows set to insert as new documents into your new container:

屏幕截图显示“数据预览”选项卡。

如果一切正常,现在可以创建新的管道,将此数据流活动添加到该管道并执行它。If everything looks good, you are now ready to create a new pipeline, add this data flow activity to that pipeline and execute it. 可从调试或触发运行执行。You can execute from debug or a triggered run. 几分钟后,你的 CosmosDB 数据库中应该有一个名为“orders”的新非规范化容器。After a few minutes, you should have a new denormalized container of orders called "orders" in your CosmosDB database.

后续步骤Next steps