将一对多关系数据迁移到 Azure Cosmos DB SQL API 帐户中Migrate one-to-few relational data into Azure Cosmos DB SQL API account

若要从关系数据库迁移到 Azure Cosmos DB SQL API,可能需要更改数据模型以进行优化。In order to migrate from a relational database to Azure Cosmos DB SQL API, it can be necessary to make changes to the data model for optimization.

一种常见的转换方法是,通过将相关子项嵌入到一个 JSON 文档来反规范化数据。One common transformation is denormalizing data by embedding related subitems within one JSON document. 本文探讨使用 Azure 数据工厂或 Azure Databricks 实现此目的的几个选项。Here we look at a few options for this using Azure Data Factory or Azure Databricks. 有关 Cosmos DB 的数据建模的一般指导,请查看 Azure Cosmos DB 中的数据建模For general guidance on data modeling for Cosmos DB, please review Data modeling in Azure Cosmos DB.

示例方案Example Scenario

假设 SQL 数据库中包含以下两个表:Orders 和 OrderDetails。Assume we have the following two tables in our SQL database, Orders and OrderDetails.

订单详细信息

我们希望在迁移期间,将此一对多关系合并到一个 JSON 文档中。We want to combine this one-to-few relationship into one JSON document during migration. 为此,我们可以按如下所示,使用“FOR JSON”创建一个 T-SQL 查询:To do this, we can create a T-SQL query using "FOR JSON" as below:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

此查询的结果如下所示:The results of this query would look as below:

订单详细信息

理想情况下,你希望使用单个 Azure 数据工厂 (ADF) 复制活动来查询用作源的 SQL 数据,并将输出作为适当的 JSON 对象直接写入 Azure Cosmos DB 接收器。Ideally, you want to use a single Azure Data Factory (ADF) copy activity to query SQL data as the source and write the output directly to Azure Cosmos DB sink as proper JSON objects. 目前,无法在一个复制活动中执行所需的 JSON 转换。Currently, it is not possible to perform the needed JSON transformation in one copy activity. 如果我们尝试将上述查询的结果复制到 Azure Cosmos DB SQL API 容器中,将会看到文档的字符串属性形式的 OrderDetails 字段,而不是预期的 JSON 数组。If we try to copy the results of the above query into an Azure Cosmos DB SQL API container, we will see the OrderDetails field as a string property of our document, instead of the expected JSON array.

可通过以下方式之一解决当前的这种限制:We can work around this current limitation in one of the following ways:

  • 使用包含两个复制活动的 Azure 数据工厂Use Azure Data Factory with two copy activities:

    1. 将 SQL 中的 JSON 格式的数据提取到位于中间 Blob 存储位置的某个文本文件,并Get JSON-formatted data from SQL to a text file in an intermediary blob storage location, and
    2. 将 JSON 文本文件中的数据加载到 Azure Cosmos DB 中的某个容器。Load data from the JSON text file to a container in Azure Cosmos DB.
  • 使用 Azure Databricks 从 SQL 中读取数据并将其写入 Azure Cosmos DB - 我们将演示这两个选项。Use Azure Databricks to read from SQL and write to Azure Cosmos DB - we will present two options here.

让我们更详细地了解这些方法:Let's look at these approaches in more detail:

Azure 数据工厂Azure Data Factory

尽管我们无法将 OrderDetails 作为 JSON 数组嵌入到目标 Cosmos DB 文档中,但可以使用两个独立的复制活动来解决该问题。Although we cannot embed OrderDetails as a JSON-array in the destination Cosmos DB document, we can work around the issue by using two separate Copy Activities.

复制活动 #1:SqlJsonToBlobTextCopy Activity #1: SqlJsonToBlobText

对于源数据,我们使用 SQL 查询通过 SQL Server OPENJSON 和 FOR JSON PATH 功能获取结果集,该结果集以单列的形式提供,每行包含一个 JSON 对象(表示订单):For the source data, we use a SQL query to get the result set as a single column with one JSON object (representing the Order) per row using the SQL Server OPENJSON and FOR JSON PATH capabilities:

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

ADF 复制

对于 SqlJsonToBlobText 复制活动的接收器,我们选择“分隔文本”,并使用动态生成的唯一文件名(例如,'@concat(pipeline().RunId,'.json')将其指向 Azure Blob 存储中的特定文件夹。For the sink of the SqlJsonToBlobText copy activity, we choose "Delimited Text" and point it to a specific folder in Azure Blob Storage with a dynamically generated unique file name (for example, '@concat(pipeline().RunId,'.json'). 由于我们的文本文件实际上并不是“分隔的”,并且我们不希望使用逗号将其分析成单独的列,而是要保留双引号 ("),因此我们将“列分隔符”设置为制表符 ("\t") 或数据中未出现其他字符,并将“引号字符”设置为“无引号字符”。Since our text file is not really "delimited" and we do not want it to be parsed into separate columns using commas and want to preserve the double-quotes ("), we set "Column delimiter" to a Tab ("\t") - or another character not occurring in the data - and "Quote character" to "No quote character".

ADF 复制

复制活动 #2:BlobJsonToCosmosCopy Activity #2: BlobJsonToCosmos

接下来,我们修改 ADF 管道:添加第二个复制活动,用于在 Azure Blob 存储中查找第一个活动创建的文本文件。Next, we modify our ADF pipeline by adding the second Copy Activity that looks in Azure Blob Storage for the text file that was created by the first activity. 第二个复制活动将结果作为“JSON”源进行处理,将文本文件中找到的每个 JSON 行作为一个文档插入到 Cosmos DB 接收器中。It processes it as "JSON" source to insert to Cosmos DB sink as one document per JSON-row found in the text file.

ADF 复制

(可选)我们还将一个“删除”活动添加到了管道,以便在每次运行之前删除 /Orders/ 文件夹中剩余的所有旧文件。Optionally, we also add a "Delete" activity to the pipeline so that it deletes all of the previous files remaining in the /Orders/ folder prior to each run. 现在,我们的 ADF 管道如下所示:Our ADF pipeline now looks something like this:

ADF 复制

触发上述管道后,会看到中间 Azure Blob 存储位置创建了一个文件,其中的每行包含一个 JSON 对象:After we trigger the pipeline above, we see a file created in our intermediary Azure Blob Storage location containing one JSON-object per row:

ADF 复制

我们还会看到 Orders 文档,其中适当嵌入的 OrderDetails 已插入到 Cosmos DB 集合中:We also see Orders documents with properly embedded OrderDetails inserted into our Cosmos DB collection:

ADF 复制

后续步骤Next steps