使用 Apache Spark 到 Azure Cosmos DB 的连接器加速大数据分析Accelerate big data analytics by using the Apache Spark to Azure Cosmos DB connector

可以使用 Cosmos DB Spark 连接器针对 Azure Cosmos DB 中存储的数据运行 Spark 作业。You can run Spark jobs with data stored in Azure Cosmos DB using the Cosmos DB Spark connector. Cosmos 可用于批处理和流式处理,并可充当服务层来降低访问延迟。Cosmos can be used for batch and stream processing, and as a serving layer for low latency access.

可以结合 Azure HDInsight(在 Azure 上提供托管的 Spark 群集)使用连接器。You can use the connector with Azure HDInsight, which provide managed Spark clusters on Azure. 下表显示了支持的 Spark 版本。The following table shows supported Spark versions.

组件Component 版本Version
Apache SparkApache Spark 2.4.x、2.3.x、2.2.x 和 2.1.x2.4.x, 2.3.x, 2.2.x, and 2.1.x
ScalaScala 2.112.11

警告

此连接器支持 Azure Cosmos DB 的核心 (SQL) API。This connector supports the core (SQL) API of Azure Cosmos DB. 对于 Cosmos DB for MongoDB API,请使用 MongoDB Spark 连接器For Cosmos DB for MongoDB API, use the MongoDB Spark connector. 对于 Cosmos DB Cassandra API,请使用 Cassandra Spark 连接器For Cosmos DB Cassandra API, use the Cassandra Spark connector.

快速入门Quickstart

  • 遵循 Java SDK 入门中的步骤设置 Cosmos DB 帐户,并填充一些数据。Follow the steps at Get started with the Java SDK to set up a Cosmos DB account, and populate some data.

  • 现在可以创建新的 Notebook,并导入 Cosmos DB 连接器库。You can now create new Notebooks, and import the Cosmos DB connector library. 有关如何设置工作区的详细信息,请跳转到使用 Cosmos DB 连接器Jump to Working with the Cosmos DB connector for details on how to set up your workspace.

  • 以下部分提供了有关如何使用连接器读取和写入数据的代码片段。The following section has snippets on how to read and write using the connector.

从 Cosmos DB 批量读取Batch reads from Cosmos DB

以下代码片段演示如何在 PySpark 中创建一个 Spark 数据帧以从 Cosmos DB 读取数据。The following snippet shows how to create a Spark DataFrame to read from Cosmos DB in PySpark.

# Read Configuration
readConfig = {
    "Endpoint": "https://doctorwho.documents.azure.cn:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format(
    "com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()

Scala 中的相同代码片段:And the same code snippet in Scala:

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Read Configuration
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.cn:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_pcoll",
  "query_custom" -> "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
))

// Connect via azure-cosmosdb-spark to create Spark DataFrame
val flights = spark.read.cosmosDB(readConfig)
flights.count()

批量写入 Cosmos DBBatch writes to Cosmos DB

以下代码片段演示如何在 PySpark 中将数据帧写入 Cosmos DB。The following snippet shows how to write a data frame to Cosmos DB in PySpark.

# Write configuration
writeConfig = {
    "Endpoint": "https://doctorwho.documents.azure.cn:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_fromsea",
    "Upsert": "true"
}

# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(
    **writeConfig).save()

Scala 中的相同代码片段:And the same code snippet in Scala:

// Write configuration

val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.cn:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" : "true"
))

// Write to Cosmos DB from the flights DataFrame
import org.apache.spark.sql.SaveMode
flights.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

从 Cosmos DB 流式读取Streaming reads from Cosmos DB

以下代码片段显示了如何连接到 Azure Cosmos DB 更改源并从中读取。The following snippet shows how to connect to and read from Azure Cosmos DB Change Feed.

# Read Configuration
readConfig = {
    "Endpoint": "https://doctorwho.documents.azure.cn:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "ReadChangeFeed": "true",
    "ChangeFeedQueryName": "Departure-Delays",
    "ChangeFeedStartFromTheBeginning": "false",
    "InferStreamSchema": "true",
    "ChangeFeedCheckpointLocation": "dbfs:/Departure-Delays"
}


# Open a read stream to the Cosmos DB Change Feed via azure-cosmosdb-spark to create Spark DataFrame
changes = (spark
           .readStream
           .format("com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSourceProvider")
           .options(**readConfig)
           .load())

Scala 中的相同代码片段:And the same code snippet in Scala:

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Read Configuration
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.cn:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_pcoll",
  "ReadChangeFeed" -> "true",
  "ChangeFeedQueryName" -> "Departure-Delays",
  "ChangeFeedStartFromTheBeginning" -> "false",
  "InferStreamSchema" -> "true",
  "ChangeFeedCheckpointLocation" -> "dbfs:/Departure-Delays"
))

// Open a read stream to the Cosmos DB Change Feed via azure-cosmosdb-spark to create Spark DataFrame
val df = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(readConfig).load()

流式写入 Cosmos DBStreaming writes to Cosmos DB

以下代码片段演示如何在 PySpark 中将数据帧写入 Cosmos DB。The following snippet shows how to write a data frame to Cosmos DB in PySpark.

# Write configuration
writeConfig = {
    "Endpoint": "https://doctorwho.documents.azure.cn:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_fromsea",
    "Upsert": "true",
    "WritingBatchSize": "500",
    "CheckpointLocation": "/checkpointlocation_write1"
}

# Write to Cosmos DB from the flights DataFrame
changeFeed = (changes
              .writeStream
              .format("com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider")
              .outputMode("append")
              .options(**writeconfig)
              .start())

Scala 中的相同代码片段:And the same code snippet in Scala:

// Write configuration

val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.cn:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" -> "true",
  "WritingBatchSize" -> "500",
  "CheckpointLocation" -> "/checkpointlocation_write1"
))

// Write to Cosmos DB from the flights DataFrame
df
.writeStream
.format(classOf[CosmosDBSinkProvider].getName)
.options(writeConfig)
.start()

有关更多代码片段和端到端的示例,请参阅 JupyterMore more snippets and end to end samples, see Jupyter.

使用连接器Working with the connector

可以基于 GitHub 中的源代码生成连接器,或通过以下链接从 Maven 下载 uber jar。You can build the connector from source in GitHub, or download the uber jars from Maven in the links below.

SparkSpark ScalaScala 最新版本Latest version
2.4.02.4.0 2.112.11 azure-cosmosdb-spark_2.4.0_2.11_1.4.0azure-cosmosdb-spark_2.4.0_2.11_1.4.0
2.3.02.3.0 2.112.11 azure-cosmosdb-spark_2.3.0_2.11_1.3.3azure-cosmosdb-spark_2.3.0_2.11_1.3.3
2.2.02.2.0 2.112.11 azure-cosmosdb-spark_2.2.0_2.11_1.1.1azure-cosmosdb-spark_2.2.0_2.11_1.1.1
2.1.02.1.0 2.112.11 azure-cosmosdb-spark_2.1.0_2.11_1.2.2azure-cosmosdb-spark_2.1.0_2.11_1.2.2

使用 spark-cliUsing spark-cli

若要通过 spark-cli 使用连接器(即 spark-shellpysparkspark-submit),可以结合连接器的 maven 坐标使用 --packages 参数。To work with the connector using the spark-cli (that is, spark-shell, pyspark, spark-submit), you can use the --packages parameter with the connector's maven coordinates.

spark-shell --master yarn --packages "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.4.0"

使用 Jupyter NotebookUsing Jupyter notebooks

如果在 HDInsight 中使用 Jupyter Notebook,可以使用 spark-magic %%configure 单元指定连接器的 maven 坐标。If you're using Jupyter notebooks within HDInsight, you can use spark-magic %%configure cell to specify the connector's maven coordinates.

{ "name":"Spark-to-Cosmos_DB_Connector",
  "conf": {
    "spark.jars.packages": "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.4.0",
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
   ...
}

请注意,包含 spark.jars.excludes 旨在消除连接器、Apache Spark 和 Livy 之间的潜在冲突。Note, the inclusion of the spark.jars.excludes is specific to remove potential conflicts between the connector, Apache Spark, and Livy.

生成连接器Build the connector

目前,此连接器项目使用 maven,因此,若要生成不带依赖项的项目,可以运行:Currently, this connector project uses maven so to build without dependencies, you can run:

mvn clean package

使用我们的示例Working with our samples

Cosmos DB Spark GitHub 存储库包含以下可供你尝试的示例 Notebook 和脚本。The Cosmos DB Spark GitHub repository has the following sample notebooks and scripts that you can try.

  • 使用 Spark 和 Cosmos DB 确保航班准时(西雅图) ipynb | html:使用 HDInsight Jupyter Notebook 服务将 Spark 连接到 Cosmos DB,以展示 Spark SQL 和图形帧,并使用机器学习管道预测航班延误。On-Time Flight Performance with Spark and Cosmos DB (Seattle) ipynb | html: Connect Spark to Cosmos DB using HDInsight Jupyter notebook service to showcase Spark SQL, GraphFrames, and predicting flight delays using ML pipelines.

  • 使用 Apache Spark 查询 Cosmos DB 图形ipynb | htmlUsing Apache Spark to query Cosmos DB Graphs: ipynb | html

  • 使用 Azure Cosmos DB 和 HDInsight (Apache Spark) 的 Lambda 体系结构 :可以使用 Cosmos DB 和 Spark 减少维护大数据管道所造成的操作开销。Lambda Architecture with Azure Cosmos DB and HDInsight (Apache Spark): You can reduce the operational overhead of maintaining big data pipelines using Cosmos DB and Spark.

更多信息More Information

我们在 azure-cosmosdb-spark wiki 中提供了更多信息,包括:We have more information in the azure-cosmosdb-spark wiki including:

配置和设置Configuration and Setup

故障排除Troubleshooting

性能Performance

更改源Change Feed

监视Monitoring

后续步骤Next steps

azure-cosmosdb-spark GitHub 存储库下载 Spark 到 Azure Cosmos DB 连接器(如果尚未下载)。If you haven't already, download the Spark to Azure Cosmos DB connector from the azure-cosmosdb-spark GitHub repository. 浏览存储库中的其他资源:Explore the following additional resources in the repo:

此外,还可以查看 Apache Spark SQL、数据帧和数据集指南以及 Azure HDInsight 上的 Apache Spark 一文。You might also want to review the Apache Spark SQL, DataFrames, and Datasets Guide, and the Apache Spark on Azure HDInsight article.