从 Azure Databricks 连接到 Azure Cosmos DB for MongoDB vCore

本文介绍了如何从 Azure Databricks 连接到 Azure Cosmos DB MongoDB vCore。 它使用 Python 代码演练了基本数据操作语言 (DML) 操作,例如读取、筛选、SQL、聚合管道和写入表。

先决条件

配置连接的依赖项

以下是从 Azure Databricks 连接到 Azure Cosmos DB for MongoDB vCore 所需的依赖项:

  • 适用于 MongoDB 的 Spark 连接器:Spark 连接器用于连接到 Azure Cosmos DB for MongoDB vCore。 请识别并使用 Maven 中心内与 Spark 环境的 Spark 和 Scala 版本兼容的连接器版本。 建议使用支持 Spark 3.2.1 或更高版本的环境,并且 Spark 连接器在 maven 坐标 org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 上可用。

  • Azure Cosmos DB for MongoDB 连接字符串:Azure Cosmos DB for MongoDB vCore 连接字符串、用户名和密码。

预配 Azure Databricks 群集

可以按说明来预配 Azure Databricks 群集。 建议选择支持 Spark 3.0 的 Databricks 运行时版本 7.6。

创建 Databricks 新群集的示意图。

添加依赖项

将适用于 Spark 库的 MongoDB 连接器添加到群集,从而连接到本机 MongoDB 和 Azure Cosmos DB for MongoDB 终结点。 在群集中,选择“库”>“安装新库”>“Maven”,然后添加 org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 Maven 坐标 。

添加 Databricks 群集依赖项的示意图。

选择“安装”,然后在安装完成后重启群集。

注意

请确保在安装适用于 Spark 的 MongoDB 连接器库之后重启 Databricks 群集。

之后,你可以创建用于迁移的 Scala 或 Python 笔记本。

创建 Python 笔记本以连接到 Azure Cosmos DB for MongoDB vCore

在 Databricks 中创建 Python 笔记本。 运行以下代码之前,请确保为变量输入正确的值。

使用 Azure Cosmos DB for MongoDB 连接字符串更新 Spark 配置

  1. 请记下 Azure 门户中 Azure Cosmos DB MongoDB vCore 资源中的“设置”->“连接字符串”下的连接字符串。 它的形式是“mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com”
  2. 回到群集配置中的 Databricks,在“高级选项”(页面底部)下,粘贴 spark.mongodb.output.urispark.mongodb.input.uri 变量的连接字符串。 适当地填充用户名和密码字段。 这样,所有在群集上运行的工作簿都使用此配置。
  3. 或者,在调用 API 时可以显式设置 option,例如:spark.read.format("mongo").option("spark.mongodb.input.uri", connectionString).load()。 如果在群集中配置变量,则无需设置该选项。
connectionString_vcore="mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
database="<database_name>"
collection="<collection_name>"

数据示例集

出于此实验室的目的,我们将使用 CSV“Citibike2019”数据集。 可以导入它:CitiBike Trip History 2019。 我们已将其加载到名为“CitiBikeDB”的数据库和“CitiBike2019”集合。 我们将变量数据库和集合设置为指向已加载的数据,并在示例中使用变量。

database="CitiBikeDB"
collection="CitiBike2019"

从 Azure Cosmos DB for MongoDB vCore 读取数据

常规语法看起来像这样:

df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()

可以验证已加载的数据帧,如下所示:

df_vcore.printSchema()
display(df_vcore)

请看以下示例:

df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()
df_vcore.printSchema()
display(df_vcore)

输出:

架构打印方案的屏幕截图。

数据帧显示数据帧的屏幕截图。

筛选来自 Azure Cosmos DB for MongoDB vCore 的数据

常规语法看起来像这样:

df_v = df_vcore.filter(df_vcore[column number/column name] == [filter condition])
display(df_v)

请看以下示例:

df_v = df_vcore.filter(df_vcore[2] == 1970)
display(df_v)

输出:显示已筛选的数据帧的屏幕截图。

创建视图或临时表,并对其运行 SQL 查询

常规语法看起来像这样:

df_[dataframename].createOrReplaceTempView("[View Name]")
spark.sql("SELECT * FROM [View Name]")

请看以下示例:

df_vcore.createOrReplaceTempView("T_VCORE")
df_v = spark.sql(" SELECT * FROM T_VCORE WHERE birth_year == 1970 and gender == 2 ")
display(df_v)

输出:显示 SQL 查询的屏幕截图。

将数据写入 Azure Cosmos DB for MongoDB vCore

常规语法看起来像这样:

df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database",database).option("collection","<collection_name>").mode("append").save()

请看以下示例:

df_vcore.write.format("mongo").option("spark.mongodb.output.uri", connectionString_vcore).option("database",database).option("collection","CitiBike2019").mode("append").save()

此命令没有输出,因为它直接写入集合。 可以使用 read 命令交叉检查记录是否已更新。

从运行聚合管道的 Azure Cosmos DB for MongoDB vCore 集合读取数据

[!注意] 聚合管道是一项强大的功能,可用于预处理和转换 Azure Cosmos DB for MongoDB 中的数据。 它非常适合实时分析、仪表板、报表生成和汇总、提供“服务器端”数据后处理的求和与求平均值。 (注意:有一整本书讲解了此内容)。

Azure Cosmos DB for MongoDB 甚至支持丰富的辅助/复合索引,只提取、筛选和处理它需要的数据。

例如,直接在数据库中分析特定地理位置的所有客户,而无需先加载完整的数据集,从而最大限度地减少数据移动并降低延迟。

下面是使用聚合函数的示例:

pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).option("pipeline", pipeline).load()
display(df_vcore)

输出:

显示聚合数据的屏幕截图。

以下文章演示如何在 Azure Cosmos DB for MongoDB vCore 中使用聚合管道: