本文介绍如何从 Azure Databricks 连接到 Azure DocumentDB,以使用 Python 和 Spark 执行常见数据作。 使用 MongoDB Spark 连接器配置必要的依赖项、建立连接并执行读取、写入、筛选和聚合作。
先决条件
Azure 订阅服务
- 如果没有 Azure 订阅,请创建 试用版
现有的 Azure DocumentDB 群集
- 如果没有群集,请 创建新群集
Azure Databricks 中的 Spark 环境
- MongoDB Spark 连接器与 Spark 3.2.1 或更高版本兼容(在 Maven 坐标
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1处可用)
- MongoDB Spark 连接器与 Spark 3.2.1 或更高版本兼容(在 Maven 坐标
配置 Azure Databricks 工作区
配置 Azure Databricks 工作区以连接到 Azure DocumentDB。 将用于 Spark 的 MongoDB 连接器库添加到计算中,以启用与 Azure DocumentDB 的连接。
导航到你的 Azure Databricks 工作区。
配置可用的默认计算,或 创建新的计算资源 来运行笔记本。
选择至少支持 Spark 3.0 的 Databricks 运行时。
在计算资源中,选择“ 库>安装新>Maven”。
添加 Maven 坐标:
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1选择“安装”。
安装完成后重启计算。
配置连接设置
将 Spark 配置为对所有读取和写入操作使用 Azure DocumentDB 连接字符串。
在 Azure 门户中,导航到 Azure DocumentDB 资源。
在“设置连接字符串”>下,复制连接字符串。 它具有以下格式:
mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.cn在 Azure Databricks 中,导航到计算配置并选择 “高级选项 ”(页面底部)。
添加以下 Spark 配置变量:
-
spark.mongodb.output.uri- 粘贴连接字符串 -
spark.mongodb.input.uri- 粘贴连接字符串
-
保存配置。
或者,在读取或写入数据时,可以使用该方法直接在代码 .option() 中设置连接字符串。
创建 Python 笔记本
通过创建新的 Python 笔记本以运行数据操作。
在 Azure Databricks 工作区中,创建新的 Python 笔记本。
在笔记本开头定义连接变量:
connectionString = "mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.cn/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000" database = "<database_name>" collection = "<collection_name>"将占位符值替换为实际的数据库名称和集合名称。
从集合中读取数据
将数据从 Azure DocumentDB 集合读取到 Spark 数据帧中。
使用以下代码从集合加载数据:
df = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).load()验证已成功加载的数据:
df.printSchema() display(df)观察结果。 此代码创建一个数据帧,其中包含指定集合中的所有文档,并显示架构和数据。
筛选数据
应用筛选器以从集合中检索特定数据子集。
使用 DataFrame
filter()方法来应用条件:df_filtered = df.filter(df["birth_year"] == 1970) display(df_filtered)使用列索引号:
df_filtered = df.filter(df[2] == 1970) display(df_filtered)观察结果。 此方法仅返回与筛选条件匹配的文档。
使用 SQL 查询数据
创建临时视图并针对数据运行 SQL 查询,以便进行基于 SQL 的熟悉分析。
从数据帧创建临时视图:
df.createOrReplaceTempView("T")针对视图执行 SQL 查询:
df_result = spark.sql("SELECT * FROM T WHERE birth_year == 1970 AND gender == 2") display(df_result)观察结果。 此方法允许对复杂的查询和联接使用标准 SQL 语法。
将数据写入集合
通过将 DataFrame 写入 Azure DocumentDB 集合来保存新的或修改的数据。
使用以下代码将数据写入集合:
df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database", database).option("collection", "CitiBike2019").mode("append").save()写入作无需输出即可完成。 通过从集合中读取数据来验证写入作是否已成功完成:
df_verify = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", "CitiBike2019").load() display(df_verify)小窍门
使用不同的写入模式,例如
append,overwrite或ignore取决于你的要求。
运行数据聚合管道
执行聚合管道,直接在 Azure DocumentDB 中执行服务器端数据处理和分析。 聚合管道支持强大的数据转换、分组和计算,而无需将数据移出数据库。 它们非常适合实时分析、仪表板和报表生成。
将聚合管道定义为 JSON 字符串:
pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"执行管道并加载结果:
df_aggregated = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).option("pipeline", pipeline).load() display(df_aggregated)
相关内容
- Maven central - MongoDB Spark 连接器版本
- 实用 MongoDB 聚合 - 聚合管道指南
- 配置防火墙设置