将一对多关系数据迁移到 Azure Cosmos DB for NoSQL 帐户

若要从关系数据库迁移到 Azure Cosmos DB for NoSQL,可能需要对数据模型进行更改进行优化。

一种常见转换是通过在一个 JSON 文档中嵌入相关子项来反规范化数据。 在这里,我们将使用 Azure 数据工厂或 Azure Databricks 来了解这一点的几个选项。 有关 Azure Cosmos DB 的数据建模的详细信息,请参阅 Azure Cosmos DB 中的数据建模

示例方案

假设 SQL 数据库中有以下两个表:Orders 和 OrderDetails。

显示 SQL 数据库中的 Orders 和 OrderDetails 表的屏幕截图。

我们希望在迁移过程中将此一对少数关系合并到一个 JSON 文档中。 要创建单个文档,请创建 T-SQL 查询并使用 FOR JSON

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

此查询的结果将包括 Orders 表中的数据:

查询的屏幕截图,其中显示了各种订单的详细信息。

理想情况下,你想要使用单个 Azure 数据工厂 (ADF) 复制活动将 SQL 数据查询为源,并将输出作为适当的 JSON 对象直接写入 Azure Cosmos DB 接收器。 目前,无法在一个复制活动中执行所需的 JSON 转换。 如果尝试将上述查询的结果复制到 Azure Cosmos DB for NoSQL 容器中,我们将 OrderDetails 字段视为文档的字符串属性,而不是预期的 JSON 数组。

我们可以通过以下方式之一解决此当前限制:

  • 将 Azure 数据工厂与两个复制活动配合使用
    1. 将 SQL 中的 JSON 格式数据存储到中间 Blob 存储位置的文本文件中
    2. 将数据从 JSON 文本文件加载到 Azure Cosmos DB 中的容器。
  • 使用 Azure Databricks 从 SQL 读取和写入 Azure Cosmos DB - 我们在此处提供了两个选项。

让我们更详细地了解这些方法:

Azure 数据工厂

尽管无法在目标 Azure Cosmos DB 文档中将 OrderDetails 嵌入为 JSON 数组,但可以使用两个单独的复制活动来解决此问题。

复制任务 #1:SqlJsonToBlobText

对于源数据,我们使用 SQL 查询,通过 SQL Server 的 OPENJSON 和 FOR JSON PATH 功能,将结果集表示为单列,其中每行包含一个表示订单的 JSON 对象。

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

ADF 复制操作中的预览值屏幕截图。

对于复制活动的接收器 SqlJsonToBlobText ,我们选择“带分隔符的文本”,并将其指向 Azure Blob 存储中的特定文件夹。 此汇聚点包含一个动态生成的唯一文件名(例如 @concat(pipeline().RunId,'.json'))。 由于文本文件不是真正“分隔的”,我们不希望它使用逗号解析为单独的列。 我们还希望保留双引号 (""),将“列分隔符”设置为 Tab ("\t"),或者设置为数据中未出现的其他字符,然后将“引用字符”设置为“无引用字符”。

突出显示列分隔符和引号字符设置的屏幕截图。

复制任务 #2:BlobJson到Cosmos

接下来,我们将通过添加第二个复制活动来修改 ADF 管道,该复制活动在 Azure Blob 存储中查找第一个活动创建的文本文件。 它将其作为“JSON”源进行处理,然后将文本文件中的每行 JSON 数据作为一个文档插入至 Azure Cosmos DB 接收器。

突出显示 JSON 源文件和“文件路径”字段的屏幕截图。

(可选)我们还向管道添加“删除”活动,以便在每次运行之前删除 /Orders/ 文件夹中剩余的所有文件。 我们的 ADF 管道现在如下所示:

突出显示“删除”活动的屏幕截图。

触发前面提到的管道后,可以看到在中间 Azure Blob 存储位置中创建的文件,其中包含每行一个 JSON 对象:

显示包含 JSON 对象的已创建文件的屏幕截图。

我们还可以看到 Orders 文档,这些文档中已正确嵌入了 OrderDetails,并已插入到 Azure Cosmos DB 集合中。

显示 Azure Cosmos DB 文档中订单详细信息的屏幕截图

Azure Databricks

我们还可以使用 Azure Databricks 中的 Spark 将数据从 SQL 数据库源复制到 Azure Cosmos DB 目标,而无需在 Azure Blob 存储中创建中间文本/JSON 文件。

注释

为了清楚和简单起见,代码片段包含显式内联的虚拟数据库密码,但理想情况下应使用 Azure Databricks 机密。

首先,创建所需的 SQL 连接器Azure Cosmos DB 连接器 库并将其附加到 Azure Databricks 群集。 重启群集以确保加载库。

显示创建所需的 SQL 连接器和 Azure Cosmos DB 连接器库并将其附加到 Azure Databricks 群集的位置的屏幕截图。

接下来,我们提供了两个适用于 Scala 和 Python 的示例。

Scala

在这里,我们将 SQL 查询的结果与“FOR JSON”输出一起输出到数据帧中:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.chinacloudapi.cn",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

显示 DataFrame 中的 SQL 查询输出的屏幕截图。

接下来,连接到我们的 Azure Cosmos DB 数据库和集合:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.cn:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

最后,我们定义架构,并在将数据帧保存到 Cosmos DB 集合之前,使用 from_json 将其应用于数据帧。

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

突出显示用于保存到 Azure Cosmos DB 集合的正确数组的屏幕截图。

Python

作为替代方法,如果源数据库不支持 FOR JSON 或类似作,可能需要在 Spark 中执行 JSON 转换。 或者,您可以对大型数据集进行并行操作。 此处提供了 PySpark 示例。 首先在第一个单元中配置源数据库连接和目标数据库连接:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

然后,我们将查询源数据库(在本例中为 SQL Server),获取订单和订单详细信息记录,并将结果放入 Spark 数据帧。 我们还创建一个列表,其中包含所有订单 ID,以及用于并行作的线程池:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

然后,创建一个函数,用于将 Orders 写入 NoSQL 集合的目标 API。 此函数筛选给定订单 ID 的所有订单详细信息,将其转换为 JSON 数组,并将数组插入 JSON 文档。 然后,JSON 文档将写入用于该订单的 NoSQL 容器 API。

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

最后,我们使用线程池上的映射函数调用 Python writeOrder 函数,以并行执行,并传入之前创建的顺序 ID 列表:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

在任一方法下,最终我们都应该在 Azure Cosmos DB 集合中的每个 Order 文档中正确地保存嵌入的 OrderDetails。

迁移后生成的数据的屏幕截图。

后续步骤