若要从关系数据库迁移到 Azure Cosmos DB for NoSQL,可能需要对数据模型进行更改进行优化。
一种常见转换是通过在一个 JSON 文档中嵌入相关子项来反规范化数据。 在这里,我们将使用 Azure 数据工厂或 Azure Databricks 来了解这一点的几个选项。 有关 Azure Cosmos DB 的数据建模的详细信息,请参阅 Azure Cosmos DB 中的数据建模。
示例方案
假设 SQL 数据库中有以下两个表:Orders 和 OrderDetails。
我们希望在迁移过程中将此一对少数关系合并到一个 JSON 文档中。 要创建单个文档,请创建 T-SQL 查询并使用 FOR JSON:
SELECT
o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;
此查询的结果将包括 Orders 表中的数据:
理想情况下,你想要使用单个 Azure 数据工厂 (ADF) 复制活动将 SQL 数据查询为源,并将输出作为适当的 JSON 对象直接写入 Azure Cosmos DB 接收器。 目前,无法在一个复制活动中执行所需的 JSON 转换。 如果尝试将上述查询的结果复制到 Azure Cosmos DB for NoSQL 容器中,我们将 OrderDetails 字段视为文档的字符串属性,而不是预期的 JSON 数组。
我们可以通过以下方式之一解决此当前限制:
-
将 Azure 数据工厂与两个复制活动配合使用:
- 将 SQL 中的 JSON 格式数据存储到中间 Blob 存储位置的文本文件中
- 将数据从 JSON 文本文件加载到 Azure Cosmos DB 中的容器。
- 使用 Azure Databricks 从 SQL 读取和写入 Azure Cosmos DB - 我们在此处提供了两个选项。
让我们更详细地了解这些方法:
Azure 数据工厂
尽管无法在目标 Azure Cosmos DB 文档中将 OrderDetails 嵌入为 JSON 数组,但可以使用两个单独的复制活动来解决此问题。
复制任务 #1:SqlJsonToBlobText
对于源数据,我们使用 SQL 查询,通过 SQL Server 的 OPENJSON 和 FOR JSON PATH 功能,将结果集表示为单列,其中每行包含一个表示订单的 JSON 对象。
SELECT [value] FROM OPENJSON(
(SELECT
id = o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o FOR JSON PATH)
)
对于复制活动的接收器 SqlJsonToBlobText ,我们选择“带分隔符的文本”,并将其指向 Azure Blob 存储中的特定文件夹。 此汇聚点包含一个动态生成的唯一文件名(例如 @concat(pipeline().RunId,'.json'))。
由于文本文件不是真正“分隔的”,我们不希望它使用逗号解析为单独的列。 我们还希望保留双引号 (""),将“列分隔符”设置为 Tab ("\t"),或者设置为数据中未出现的其他字符,然后将“引用字符”设置为“无引用字符”。
复制任务 #2:BlobJson到Cosmos
接下来,我们将通过添加第二个复制活动来修改 ADF 管道,该复制活动在 Azure Blob 存储中查找第一个活动创建的文本文件。 它将其作为“JSON”源进行处理,然后将文本文件中的每行 JSON 数据作为一个文档插入至 Azure Cosmos DB 接收器。
(可选)我们还向管道添加“删除”活动,以便在每次运行之前删除 /Orders/ 文件夹中剩余的所有文件。 我们的 ADF 管道现在如下所示:
触发前面提到的管道后,可以看到在中间 Azure Blob 存储位置中创建的文件,其中包含每行一个 JSON 对象:
我们还可以看到 Orders 文档,这些文档中已正确嵌入了 OrderDetails,并已插入到 Azure Cosmos DB 集合中。
Azure Databricks
我们还可以使用 Azure Databricks 中的 Spark 将数据从 SQL 数据库源复制到 Azure Cosmos DB 目标,而无需在 Azure Blob 存储中创建中间文本/JSON 文件。
注释
为了清楚和简单起见,代码片段包含显式内联的虚拟数据库密码,但理想情况下应使用 Azure Databricks 机密。
首先,创建所需的 SQL 连接器 和 Azure Cosmos DB 连接器 库并将其附加到 Azure Databricks 群集。 重启群集以确保加载库。
接下来,我们提供了两个适用于 Scala 和 Python 的示例。
Scala
在这里,我们将 SQL 查询的结果与“FOR JSON”输出一起输出到数据帧中:
// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.chinacloudapi.cn",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
接下来,连接到我们的 Azure Cosmos DB 数据库和集合:
// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.cn:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)
最后,我们定义架构,并在将数据帧保存到 Cosmos DB 集合之前,使用 from_json 将其应用于数据帧。
// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Python
作为替代方法,如果源数据库不支持 FOR JSON 或类似作,可能需要在 Spark 中执行 JSON 转换。 或者,您可以对大型数据集进行并行操作。 此处提供了 PySpark 示例。 首先在第一个单元中配置源数据库连接和目标数据库连接:
import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
"Endpoint": "Endpoint",
"Masterkey": "Masterkey",
"Database": "OrdersDB",
"Collection": "Orders",
"Upsert": "true"
}
然后,我们将查询源数据库(在本例中为 SQL Server),获取订单和订单详细信息记录,并将结果放入 Spark 数据帧。 我们还创建一个列表,其中包含所有订单 ID,以及用于并行作的线程池:
import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
#get all OrderId values to pass to map function
orderids = orders.select('OrderId').collect()
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)
然后,创建一个函数,用于将 Orders 写入 NoSQL 集合的目标 API。 此函数筛选给定订单 ID 的所有订单详细信息,将其转换为 JSON 数组,并将数组插入 JSON 文档。 然后,JSON 文档将写入用于该订单的 NoSQL 容器 API。
def writeOrder(orderid):
#filter the order on current value passed from map function
order = orders.filter(orders['OrderId'] == orderid[0])
#set id to be a uuid
order = order.withColumn("id", lit(str(uuid.uuid1())))
#add details field to order dataframe
order = order.withColumn("details", lit(''))
#filter order details dataframe to get details we want to merge into the order document
orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
#convert dataframe to pandas
orderpandas = order.toPandas()
#convert the order dataframe to json and remove enclosing brackets
orderjson = orderpandas.to_json(orient='records', force_ascii=False)
orderjson = orderjson[1:-1]
#convert orderjson to a dictionaory so we can set the details element with order details later
orderjsondata = json.loads(orderjson)
#convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
if (orderdetailsgroup.count() !=0):
#convert orderdetailsgroup to pandas dataframe to work better with json
orderdetailsgroup = orderdetailsgroup.toPandas()
#convert orderdetailsgroup to json string
jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
#convert jsonstring to dictionary to ensure correct encoding and no corrupt records
jsonstring = json.loads(jsonstring)
#set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order
orderjsondata['details'] = jsonstring
#convert dictionary to json
orderjsondata = json.dumps(orderjsondata)
#read the json into spark dataframe
df = spark.read.json(sc.parallelize([orderjsondata]))
#write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
#/cosmos-db/spark-connector
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()
最后,我们使用线程池上的映射函数调用 Python writeOrder 函数,以并行执行,并传入之前创建的顺序 ID 列表:
#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)
在任一方法下,最终我们都应该在 Azure Cosmos DB 集合中的每个 Order 文档中正确地保存嵌入的 OrderDetails。