教程:使用 Spark 连接到 Azure Cosmos DB for NoSQL
适用范围: NoSQL
本教程使用 Azure Cosmos DB Spark 连接器从 Azure Cosmos DB for NoSQL 帐户读取或写入数据。 本教程使用 Azure Databricks 和 Jupyter 笔记本来说明如何从 Spark 与 API for NoSQL 集成。 本教程重点介绍 Python 和 Scala,不过你可以使用 Spark 支持的任何语言或界面。
本教程介绍如何执行下列操作:
- 使用 Spark 和 Jupyter Notebook 连接到 API for NoSQL 帐户。
- 创建数据库和容器资源。
- 将数据引入容器。
- 查询容器中的数据。
- 对容器中的项执行常见操作。
先决条件
- 一个现有的适用于 NoSQL 的 Azure Cosmos DB 帐户。
- 如果你有现有的 Azure 订阅,请创建一个新帐户。
- 现有的 Azure Databricks 工作区。
使用 Spark 和 Jupyter 进行连接
使用现有的 Azure Databricks 工作区创建一个可以使用 Apache Spark 3.4.x 连接到 Azure Cosmos DB for NoSQL 帐户的计算群集。
打开 Azure Databricks 工作区。
在工作区界面中,创建新的群集。 至少使用以下设置配置群集:
版本 值 运行时版本 13.3 LTS(Scala 2.12、Spark 3.4.1) 使用工作区界面从 Maven Central 搜索组 ID 为
com.azure.cosmos.spark
的 Maven 包。 将特定于 Spark 3.4 且项目 ID 前缀为azure-cosmos-spark_3-4
的包安装到群集。最后,创建新的笔记本。
提示
默认情况下,笔记本会附加到最近创建的群集。
在笔记本中,设置 NoSQL 帐户终结点、数据库名称和容器名称的联机事务处理连接器 (OLTP) 配置设置。
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
创建数据库和容器
使用目录 API 管理帐户资源,例如数据库和容器。 然后,可以使用 OLTP 管理容器资源中的数据。
配置目录 API 以使用 Spark 来管理 API for NoSQL 资源。
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
使用
CREATE DATABASE IF NOT EXISTS
创建名为cosmicworks
的新数据库。# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
使用
CREATE TABLE IF NOT EXISTS
创建名为products
的新容器。 确保将分区键路径设置为/category
并启用自动缩放吞吐量,最大吞吐量为每秒1000
个请求单位 (RU)。# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
使用分层分区键配置创建另一个名为
employees
的容器。 使用/organization
、/department
和/team
作为分区键路径集。 按照该特定顺序操作。 此外,将吞吐量手动设置为400
RU。# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
运行笔记本单元格,以验证数据库和容器是否是在 API for NoSQL 帐户中创建的。
引入数据
创建示例数据集。 然后使用 OLTP 将数据引入 API for NoSQL 容器。
创建示例数据集。
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
使用
spark.createDataFrame
和以前保存的 OLTP 配置将示例数据添加到目标容器。# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
查询数据
将 OLTP 数据加载到数据帧中,以对数据执行常见查询。 可以使用各种语法筛选或查询数据。
使用
spark.read
将 OLTP 数据加载到数据帧对象中。 使用本教程前面使用的相同配置。 此外,将spark.cosmos.read.inferSchema.enabled
设置为true
,以允许 Spark 连接器通过采样现有项来推断架构。# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
使用
printSchema
呈现在数据帧中加载的数据的架构。# Render schema df.printSchema()
// Render schema df.printSchema()
呈现
quantity
列小于20
的数据行。 使用where
和show
函数执行此查询。# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
呈现第一个数据行,其中
clearance
列为true
。 使用filter
函数执行此查询。# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
呈现五行数据,且无筛选器或截断。 使用
show
函数自定义所呈现行的外观和行数。# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
使用以下原始 NoSQL 查询字符串查询数据:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
执行常见操作
在 Spark 中使用 API for NoSQL 数据时,可以执行部分更新或将数据作为原始 JSON 处理。
若要执行项的部分更新,请执行以下步骤:
复制现有
config
配置变量并修改新副本中的属性。 具体而言,将写入策略配置为ItemPatch
。 然后禁用批量操作支持。 设置列和映射操作。 最后,将默认操作类型设置为Set
。# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
为要作为此修补操作目标的项分区键和唯一标识符创建变量。
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
创建一组修补程序对象以指定目标项并指定应修改的字段。
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
使用修补程序对象集创建数据帧。 使用
write
执行修补操作。# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
运行查询以查看修补操作的结果。 现在,该项应命名为
Yamba New Surfboard
,无需进行其他更改。# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
若要使用原始 JSON 数据,请执行以下操作:
复制现有
config
配置变量并修改新副本中的属性。 具体而言,将目标容器更改为employees
。 然后将contacts
列/字段配置为使用原始 JSON 数据。# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
创建一组员工以引入容器。
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
创建数据帧并使用
write
引入员工数据。# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
使用
show
呈现数据帧中的数据。 请注意,contacts
列是输出中的原始 JSON。# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
相关内容
- Apache Spark
- Azure Cosmos DB 目录 API
- 配置参数引用
- Azure Cosmos DB Spark 连接器示例
- 从 Spark 2.4 迁移到 Spark 3.*
- 版本兼容性:
- 发行说明:
- 下载链接: