快速入门:使用适用于 SQL API 的 Azure Cosmos DB Spark 3 OLTP 连接器管理数据

适用于: SQL API

Cosmos DB Spark 连接器预览版的快速入门指南

本教程是一个快速入门指南,介绍如何使用 Cosmos DB Spark 连接器从 Cosmos DB 中读取或向其写入。 Cosmos DB Spark 连接器基于 Spark 3.1.x。

在本快速教程中,我们将依靠包含 Spark 3.1.1 的 Azure Databricks Runtime 8.0 和 Jupyter Notebook 来演示如何使用 Cosmos DB Spark 连接器。

你也可以使用任何其他 Spark 3.1.1 Spark 产品/服务,而且还应该能够使用 Spark 支持的任何语言(PySpark、Scala、Java 等),或你熟悉的任何 Spark 界面(Jupyter Notebook、Livy 等)。

先决条件

仅当计划使用日志记录时,才需要 SLF4J。还请下载 SLF4J 绑定,该绑定可将 SLF4J API 与你选择的记录实现链接在一起。 有关详细信息,请参阅 SLF4J 用户手册

在 Spark 群集中安装 Cosmos DB Spark 连接器 azure-cosmos-spark_3-1_2-12-4.1.0.jar

本入门指南基于 PySpark,但你也可以使用对应的 scala 版本,并可以在 Azure Databricks PySpark 笔记本中运行以下代码片段。

创建数据库和容器

首先设置 Cosmos DB 帐户凭据,以及 Cosmos DB 数据库名称和容器名称。

cosmosEndpoint = "https://REPLACEME.documents.azure.cn:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

接下来,可以使用新的目录 API 通过 Spark 创建 Cosmos DB 数据库和容器。

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

使用目录 API 创建容器时,可为要创建的容器设置吞吐量和分区键路径

有关更多详细信息,请参阅完整的目录 API 文档。

引入数据

数据源的名称为 cosmos.oltp,以下示例演示如何在 Cosmos DB 中写入由两个项构成的内存数据帧:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

请注意,id 是 Cosmos DB 的必填字段。

有关引入数据的更多详细信息,请参阅完整的写入配置文档。

正在查询数据

可以使用同一 cosmos.oltp 数据源查询数据,并使用 filter 来下推筛选器:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

有关查询数据的更多详细信息,请参阅完整的查询配置文档。

架构推理

查询数据时,Spark 连接器可以通过将 spark.cosmos.read.inferSchema.enabled 设置为 true,基于对现有项的采样来推理架构。

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.printSchema()

或者,你可以传递要用来读取数据的自定义架构:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\
 .load()

df.printSchema()

如果未指定自定义架构并已禁用架构推理,则生成的数据将返回项的原始 JSON 内容:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()

df.printSchema()

有关架构推理的更多详细信息,请参阅完整的架构推理配置文档。

配置参考:

常规配置

配置属性名称 默认 说明
spark.cosmos.accountEndpoint Cosmos DB 帐户终结点 URI
spark.cosmos.accountKey Cosmos DB 帐户密钥
spark.cosmos.database Cosmos DB 数据库名称
spark.cosmos.container Cosmos DB 容器名称

其他优化方法

配置属性名称 默认 说明
spark.cosmos.useGatewayMode false 对客户端操作使用网关模式
spark.cosmos.read.forceEventualConsistency true 使客户端对读取操作使用最终一致性,而不是使用默认的帐户级别一致性
spark.cosmos.applicationName 应用程序名称
spark.cosmos.preferredRegionsList 要用于多区域 Cosmos DB 帐户的首选区域列表。 这是一个逗号分隔值(例如 [China East, China North]China East, China North),前提是首选区域将用作提示。 应使用与 Cosmos DB 帐户并置在一起的 Spark 群集,并将 Spark 群集区域作为首选区域传递。 请参阅此处的 Azure 区域列表

写入配置

配置属性名称 默认 说明
spark.cosmos.write.strategy ItemOverwrite Cosmos DB 项写入策略:ItemOverwrite(使用 upsert)、ItemAppend(使用 create,忽略现存的项,例如冲突项)、ItemDelete(删除所有文档)、ItemDeleteIfNotModified(删除尚未更改其 etag 的所有文档)
spark.cosmos.write.maxRetryCount 10 发生可重试的失败(例如连接错误,moderakh 将添加更多详细信息)时的 Cosmos DB 写入最大重试次数
spark.cosmos.write.point.maxConcurrency Cosmos DB 项写入的最大并发数。 如果此值未指定,将根据 Spark 执行器 VM 大小确定
spark.cosmos.write.bulk.maxPendingOperations Cosmos DB 项写入批量模式下的最大挂起操作数。 定义并发处理的批量操作限制。 如果未指定,将根据 Spark 执行器 VM 大小确定此值。 如果数据量对于目标容器的预配吞吐量而言太大,可以遵循 1000 x Cores 的估算来调整此设置
spark.cosmos.write.bulk.enabled true 已启用 Cosmos DB 项写入批量

查询配置

配置属性名称 默认 说明
spark.cosmos.read.customQuery 如果已提供此值,则会根据 Cosmos 终结点处理自定义查询,而不是通过谓词下推来动态生成查询。 通常我们建议依赖于 Spark 的谓词下推,因为这样可以基于查询计划生成最高效的筛选器集。 但是,有几个谓词(例如 count、group by、avg、sum 等聚合)目前无法下推(至少在 Spark 3.1 中是这样的)- 因此自定义查询是一种回退方式,可将这些谓词推送到发往 Cosmos 的查询中。

架构推理配置

执行读取操作时,用户可指定自定义架构,或允许连接器推断架构。 默认启用架构推理。

配置属性名称 默认 说明
spark.cosmos.read.inferSchema.enabled true 架构推理处于禁用状态并且用户未提供架构时,将返回原始 json。
spark.cosmos.read.inferSchema.query SELECT * FROM r 架构推理启用后,将用作自定义查询来推断架构。 例如,如果在一个容器中存储具有不同架构的多个实体,并且希望确保推理仅查看特定文档类型,或希望只投影特定的列。
spark.cosmos.read.inferSchema.samplingSize 1000 推断架构而不使用查询时要使用的采样大小。
spark.cosmos.read.inferSchema.includeSystemProperties false 启用架构推理后,得到的架构是否将包含所有 Cosmos DB 系统属性
spark.cosmos.read.inferSchema.includeTimestamp false 启用架构推理后,得到的架构是否将包含文档时间戳 (_ts)。 如果 spark.cosmos.read.inferSchema.includeSystemProperties 已启用,则无需此项,因为该架构已包括所有系统属性。
spark.cosmos.read.inferSchema.forceNullableProperties false 启用架构推理后,生成的架构是否会使所有列可为 null。 默认情况下,是否将推理的列视为可为 null 取决于样本集内的任何记录是否在某个列中包含 null 值。 如果设置为 true,则会将所有列视为可为 null,即使样本集内的所有行都包含非 null 值。

Json 转换配置

配置属性名称 默认 说明
spark.cosmos.read.schemaConversionMode Relaxed 架构转换行为(RelaxedStrict)。 读取 json 文档时,如果某个文档包含未映射到架构类型的特性,用户可决定是使用 null 值(Relaxed)还是异常(Strict)。

分区策略配置

配置属性名称 默认 说明
spark.cosmos.read.partitioning.strategy Default 使用的分区策略(Default、Custom、Restrictive 或 Aggressive)
spark.cosmos.partitioning.targetedCount 目标分区计数。 此参数是可选的,除非使用 strategy==Custom,否则将被忽略。 在这种情况下,Spark 连接器不会动态计算分区数量,而是坚持使用此值。

吞吐量控制配置

配置属性名称 默认 说明
spark.cosmos.throughputControl.enabled false 吞吐量控制是否已启用
spark.cosmos.throughputControl.name 吞吐量控制组名称
spark.cosmos.throughputControl.targetThroughput 吞吐量控制组目标吞吐量
spark.cosmos.throughputControl.targetThroughputThreshold 吞吐量控制组目标吞吐量阈值
spark.cosmos.throughputControl.globalControl.database 将用于吞吐量多区域控制的数据库
spark.cosmos.throughputControl.globalControl.container 将用于吞吐量多区域控制的容器
spark.cosmos.throughputControl.globalControl.renewIntervalInMS 5s 客户端更新自身吞吐量使用情况的频率
spark.cosmos.throughputControl.globalControl.expireIntervalInMS 11s 检测脱机客户端的速度