从 Azure Databricks 连接到 Azure Cosmos DB for MongoDB vCore
本文介绍了如何从 Azure Databricks 连接到 Azure Cosmos DB MongoDB vCore。 它使用 Python 代码演练了基本数据操作语言 (DML) 操作,例如读取、筛选、SQL、聚合管道和写入表。
先决条件
预配所选的 Spark 环境 Azure Databricks。
配置连接的依赖项
以下是从 Azure Databricks 连接到 Azure Cosmos DB for MongoDB vCore 所需的依赖项:
适用于 MongoDB 的 Spark 连接器:Spark 连接器用于连接到 Azure Cosmos DB for MongoDB vCore。 请识别并使用 Maven 中心内与 Spark 环境的 Spark 和 Scala 版本兼容的连接器版本。 建议使用支持 Spark 3.2.1 或更高版本的环境,并且 Spark 连接器在 maven 坐标
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
上可用。Azure Cosmos DB for MongoDB 连接字符串:Azure Cosmos DB for MongoDB vCore 连接字符串、用户名和密码。
预配 Azure Databricks 群集
可以按说明来预配 Azure Databricks 群集。 建议选择支持 Spark 3.0 的 Databricks 运行时版本 7.6。
添加依赖项
将适用于 Spark 库的 MongoDB 连接器添加到群集,从而连接到本机 MongoDB 和 Azure Cosmos DB for MongoDB 终结点。 在群集中,选择“库”>“安装新库”>“Maven”,然后添加 org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
Maven 坐标 。
选择“安装”,然后在安装完成后重启群集。
注意
请确保在安装适用于 Spark 的 MongoDB 连接器库之后重启 Databricks 群集。
之后,你可以创建用于迁移的 Scala 或 Python 笔记本。
创建 Python 笔记本以连接到 Azure Cosmos DB for MongoDB vCore
在 Databricks 中创建 Python 笔记本。 运行以下代码之前,请确保为变量输入正确的值。
使用 Azure Cosmos DB for MongoDB 连接字符串更新 Spark 配置
- 请记下 Azure 门户中 Azure Cosmos DB MongoDB vCore 资源中的“设置”->“连接字符串”下的连接字符串。 它的形式是“mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com”
- 回到群集配置中的 Databricks,在“高级选项”(页面底部)下,粘贴
spark.mongodb.output.uri
和spark.mongodb.input.uri
变量的连接字符串。 适当地填充用户名和密码字段。 这样,所有在群集上运行的工作簿都使用此配置。 - 或者,在调用 API 时可以显式设置
option
,例如:spark.read.format("mongo").option("spark.mongodb.input.uri", connectionString).load()
。 如果在群集中配置变量,则无需设置该选项。
connectionString_vcore="mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
database="<database_name>"
collection="<collection_name>"
数据示例集
出于此实验室的目的,我们将使用 CSV“Citibike2019”数据集。 可以导入它:CitiBike Trip History 2019。 我们已将其加载到名为“CitiBikeDB”的数据库和“CitiBike2019”集合。 我们将变量数据库和集合设置为指向已加载的数据,并在示例中使用变量。
database="CitiBikeDB"
collection="CitiBike2019"
从 Azure Cosmos DB for MongoDB vCore 读取数据
常规语法看起来像这样:
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()
可以验证已加载的数据帧,如下所示:
df_vcore.printSchema()
display(df_vcore)
请看以下示例:
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()
df_vcore.printSchema()
display(df_vcore)
输出:
架构
数据帧
筛选来自 Azure Cosmos DB for MongoDB vCore 的数据
常规语法看起来像这样:
df_v = df_vcore.filter(df_vcore[column number/column name] == [filter condition])
display(df_v)
请看以下示例:
df_v = df_vcore.filter(df_vcore[2] == 1970)
display(df_v)
输出:
创建视图或临时表,并对其运行 SQL 查询
常规语法看起来像这样:
df_[dataframename].createOrReplaceTempView("[View Name]")
spark.sql("SELECT * FROM [View Name]")
请看以下示例:
df_vcore.createOrReplaceTempView("T_VCORE")
df_v = spark.sql(" SELECT * FROM T_VCORE WHERE birth_year == 1970 and gender == 2 ")
display(df_v)
输出:
将数据写入 Azure Cosmos DB for MongoDB vCore
常规语法看起来像这样:
df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database",database).option("collection","<collection_name>").mode("append").save()
请看以下示例:
df_vcore.write.format("mongo").option("spark.mongodb.output.uri", connectionString_vcore).option("database",database).option("collection","CitiBike2019").mode("append").save()
此命令没有输出,因为它直接写入集合。 可以使用 read 命令交叉检查记录是否已更新。
从运行聚合管道的 Azure Cosmos DB for MongoDB vCore 集合读取数据
[!注意] 聚合管道是一项强大的功能,可用于预处理和转换 Azure Cosmos DB for MongoDB 中的数据。 它非常适合实时分析、仪表板、报表生成和汇总、提供“服务器端”数据后处理的求和与求平均值。 (注意:有一整本书讲解了此内容)。
Azure Cosmos DB for MongoDB 甚至支持丰富的辅助/复合索引,只提取、筛选和处理它需要的数据。
例如,直接在数据库中分析特定地理位置的所有客户,而无需先加载完整的数据集,从而最大限度地减少数据移动并降低延迟。
下面是使用聚合函数的示例:
pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).option("pipeline", pipeline).load()
display(df_vcore)
输出:
相关内容
以下文章演示如何在 Azure Cosmos DB for MongoDB vCore 中使用聚合管道: