从 Azure Databricks 连接到 Azure DocumentDB

本文介绍如何从 Azure Databricks 连接到 Azure DocumentDB,以使用 Python 和 Spark 执行常见数据作。 使用 MongoDB Spark 连接器配置必要的依赖项、建立连接并执行读取、写入、筛选和聚合作。

先决条件

  • Azure 订阅服务

    • 如果没有 Azure 订阅,请创建 试用版
  • 现有的 Azure DocumentDB 群集

配置 Azure Databricks 工作区

配置 Azure Databricks 工作区以连接到 Azure DocumentDB。 将用于 Spark 的 MongoDB 连接器库添加到计算中,以启用与 Azure DocumentDB 的连接。

  1. 导航到你的 Azure Databricks 工作区。

  2. 配置可用的默认计算,或 创建新的计算资源 来运行笔记本。

  3. 选择至少支持 Spark 3.0 的 Databricks 运行时。

  4. 在计算资源中,选择“ >安装新>Maven”。

  5. 添加 Maven 坐标: org.mongodb.spark:mongo-spark-connector_2.12:3.0.1

  6. 选择“安装”。

  7. 安装完成后重启计算。

配置连接设置

将 Spark 配置为对所有读取和写入操作使用 Azure DocumentDB 连接字符串。

  1. 在 Azure 门户中,导航到 Azure DocumentDB 资源。

  2. “设置连接字符串”>下,复制连接字符串。 它具有以下格式: mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.cn

  3. 在 Azure Databricks 中,导航到计算配置并选择 “高级选项 ”(页面底部)。

  4. 添加以下 Spark 配置变量:

    • spark.mongodb.output.uri - 粘贴连接字符串
    • spark.mongodb.input.uri - 粘贴连接字符串
  5. 保存配置。

或者,在读取或写入数据时,可以使用该方法直接在代码 .option() 中设置连接字符串。

创建 Python 笔记本

通过创建新的 Python 笔记本以运行数据操作。

  1. 在 Azure Databricks 工作区中,创建新的 Python 笔记本。

  2. 在笔记本开头定义连接变量:

    connectionString = "mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.cn/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
    database = "<database_name>"
    collection = "<collection_name>"
    
  3. 将占位符值替换为实际的数据库名称和集合名称。

从集合中读取数据

将数据从 Azure DocumentDB 集合读取到 Spark 数据帧中。

  1. 使用以下代码从集合加载数据:

    df = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).load()
    
  2. 验证已成功加载的数据:

    df.printSchema()
    display(df)
    
  3. 观察结果。 此代码创建一个数据帧,其中包含指定集合中的所有文档,并显示架构和数据。

筛选数据

应用筛选器以从集合中检索特定数据子集。

  1. 使用 DataFrame filter() 方法来应用条件:

    df_filtered = df.filter(df["birth_year"] == 1970)
    display(df_filtered)
    
  2. 使用列索引号:

    df_filtered = df.filter(df[2] == 1970)
    display(df_filtered)
    
  3. 观察结果。 此方法仅返回与筛选条件匹配的文档。

使用 SQL 查询数据

创建临时视图并针对数据运行 SQL 查询,以便进行基于 SQL 的熟悉分析。

  1. 从数据帧创建临时视图:

    df.createOrReplaceTempView("T")
    
  2. 针对视图执行 SQL 查询:

    df_result = spark.sql("SELECT * FROM T WHERE birth_year == 1970 AND gender == 2")
    display(df_result)
    
  3. 观察结果。 此方法允许对复杂的查询和联接使用标准 SQL 语法。

将数据写入集合

通过将 DataFrame 写入 Azure DocumentDB 集合来保存新的或修改的数据。

  1. 使用以下代码将数据写入集合:

    df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database", database).option("collection", "CitiBike2019").mode("append").save()
    
  2. 写入作无需输出即可完成。 通过从集合中读取数据来验证写入作是否已成功完成:

    df_verify = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", "CitiBike2019").load()
    display(df_verify)
    

    小窍门

    使用不同的写入模式,例如 appendoverwriteignore 取决于你的要求。

运行数据聚合管道

执行聚合管道,直接在 Azure DocumentDB 中执行服务器端数据处理和分析。 聚合管道支持强大的数据转换、分组和计算,而无需将数据移出数据库。 它们非常适合实时分析、仪表板和报表生成。

  1. 将聚合管道定义为 JSON 字符串:

    pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"
    
  2. 执行管道并加载结果:

    df_aggregated = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).option("pipeline", pipeline).load()
    display(df_aggregated)