教程:使用 Spark 连接到 Azure Cosmos DB for NoSQL

适用范围: NoSQL

本教程使用 Azure Cosmos DB Spark 连接器从 Azure Cosmos DB for NoSQL 帐户读取或写入数据。 本教程使用 Azure Databricks 和 Jupyter 笔记本来说明如何从 Spark 与 API for NoSQL 集成。 本教程重点介绍 Python 和 Scala,不过可以使用 Spark 支持的任何语言或界面。

本教程介绍如何执行下列操作:

  • 使用 Spark 和 Jupyter 笔记本连接到 API for NoSQL 帐户
  • 创建数据库和容器资源
  • 将数据引入容器
  • 查询容器中的数据
  • 对容器中的项执行常见操作

先决条件

  • 一个现有的适用于 NoSQL 的 Azure Cosmos DB 帐户。
  • 现有的 Azure Databricks 工作区。

使用 Spark 和 Jupyter 进行连接

使用现有的 Azure Databricks 工作区创建一个可以使用 Apache Spark 3.4.x 连接到 Azure Cosmos DB for NoSQL 帐户的计算群集。

  1. 打开 Azure Databricks 工作区。

  2. 在工作区界面中,创建新的群集。 至少使用以下设置配置群集:

    运行时版本 13.3 LTS(Scala 2.12、Spark 3.4.1)
  3. 使用工作区界面从 Maven Central 中搜索组 ID 为 com.azure.cosmos.spark 的 Maven 包。 在群集中安装工件 ID 前缀为 azure-cosmos-spark_3-4 的 Spark 3.4 专用包

  4. 最后,创建新的笔记本

    提示

    默认情况下,笔记本将附加到最近创建的群集。

  5. 在笔记本中,设置 NoSQL 帐户终结点的 OLTP 配置设置、数据库名称和容器名称。

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

创建数据库和容器

使用目录 API 管理帐户资源,例如数据库和容器。 然后,可以使用 OLTP 管理容器资源中的数据。

  1. 配置目录 API 以使用 Spark 管理 API for NoSQL 资源。

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. 使用 CREATE DATABASE IF NOT EXISTS 创建名为 cosmicworks 的新数据库。

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. 使用 CREATE TABLE IF NOT EXISTS 创建名为 products 的新容器。 确保将分区键路径设置为 /category 并启用自动缩放吞吐量,最大吞吐量为每秒 1000 请求单位数 (RU/s)。

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. 使用分层分区键配置创建另一个名为 employees 的容器,并将 /organization/department/team 按该特定顺序作为分区键路径集。 此外,将吞吐量设置为手动控制的量 400 RU/s

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. 运行笔记本单元格以验证数据库和容器是否已在 API for NoSQL 帐户中创建。

引入数据

创建示例数据集,然后使用 OLTP 将数据引入到 API for NoSQL 容器。

  1. 创建示例数据集。

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. 使用 spark.createDataFrame 和以前保存的 OLTP 配置将示例数据添加到目标容器。

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

查询数据

将 OLTP 数据加载到数据帧中,以对数据执行常见查询。 可以使用各种语法筛选或查询数据。

  1. 使用 spark.read 将 OLTP 数据加载到数据帧对象中。 使用本教程前面使用的相同配置。 此外,将 spark.cosmos.read.inferSchema.enabled 设置为 true 以允许 Spark 连接器通过采样现有项来推断架构。

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. 使用 printSchema. 呈现在数据帧中加载的数据的架构。

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. 呈现 quantity 列小于 20 的数据行。 使用 whereshow 函数执行此查询。

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. 呈现 clearance 列为 true 的第一个数据行。 使用 filter 函数执行此查询。

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. 呈现五行数据,且无筛选器或截断。 使用 show 函数自定义所呈现行的外观和行数。

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. 使用此原始 NoSQL 查询字符串查询数据:SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

执行常见操作

在 Spark 中使用 API for NoSQL 数据时,可以执行部分更新或使用数据作为原始 JSON。

  1. 若要对项执行部分更新,请执行以下步骤:

    1. 复制现有 config 配置变量并修改新副本中的属性。 具体来说;将写入策略配置为 ItemPatch,禁用批量支持,设置列和映射操作,最后将默认操作类型设置为 Set

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. 为要作为此修补操作目标的项分区键和唯一标识符创建变量。

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. 创建一组修补程序对象以指定目标项并指定应修改的字段。

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. 使用该组修补程序对象创建数据帧,并使用 write 执行修补操作。

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. 运行查询以查看修补操作的结果。 现在,该项应命名为 Yamba New Surfboard,无需进行其他更改。

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. 若要使用原始 JSON 数据,请执行以下步骤:

    1. 复制现有 config 配置变量并修改新副本中的属性。 具体来说,将目标容器更改为 employees 并配置 contacts 列/字段以使用原始 JSON 数据。

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. 创建一组员工以引入容器。

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. 创建数据帧并使用 write 引入员工数据。

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. 使用 show 呈现数据帧中的数据。 请注意,contacts 列是输出中的原始 JSON。

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

下一步