Quickstart: Manage data with Azure Cosmos DB Spark 3 OLTP Connector for SQL API

APPLIES TO: SQL API

This tutorial is a quick start guide to show how to use Cosmos DB Spark Connector to read from or write to Cosmos DB. Cosmos DB Spark Connector supports Spark 3.1.x and 3.2.x.

Throughout this quick tutorial, we rely on Azure Databricks Runtime 8.0 with Spark 3.1.1 and a Jupyter Notebook to show how to use the Cosmos DB Spark Connector, but you can also use Azure Databricks Runtime 10.3 with Spark 3.2.1.

You can use any other Spark 3.1.1 or 3.2.1 spark offering as well, also you should be able to use any language supported by Spark (PySpark, Scala, Java, etc.), or any Spark interface you are familiar with (Jupyter Notebook, Livy, etc.).

Prerequisites

SLF4J is only needed if you plan to use logging, also download an SLF4J binding, which will link the SLF4J API with the logging implementation of your choice. See the SLF4J user manual for more information.

Install Cosmos DB Spark Connector in your spark cluster using the latest version for Spark 3.1.x or using the latest version for Spark 3.2.x.

The getting started guide is based on PySpark however you can use the equivalent scala version as well, and you can run the following code snippet in an Azure Databricks PySpark notebook.

Create databases and containers

First, set Cosmos DB account credentials, and the Cosmos DB Database name and container name.

cosmosEndpoint = "https://REPLACEME.documents.azure.cn:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

Next, you can use the new Catalog API to create a Cosmos DB Database and Container through Spark.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

When creating containers with the Catalog API, you can set the throughput and partition key path for the container to be created.

For more information, see the full Catalog API documentation.

Ingest data

The name of the data source is cosmos.oltp, and the following example shows how you can write a memory dataframe consisting of two items to Cosmos DB:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

Note that id is a mandatory field for Cosmos DB.

For more information related to ingesting data, see the full write configuration documentation.

Query data

Using the same cosmos.oltp data source, we can query data and use filter to push down filters:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

For more information related to querying data, see the full query configuration documentation.

Schema inference

When querying data, the Spark Connector can infer the schema based on sampling existing items by setting spark.cosmos.read.inferSchema.enabled to true.

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.printSchema()

Alternatively, you can pass the custom schema you want to be used to read the data:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(customSchema).format("cosmos.oltp").options(**cfg)\
 .load()

df.printSchema()

If no custom schema is specified and schema inference is disabled, then the resulting data will be returning the raw Json content of the items:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()

df.printSchema()

For more information related to schema inference, see the full schema inference configuration documentation.

Configuration reference

The Azure Cosmos DB Spark 3 OLTP Connector for SQL API has a complete configuration reference that provides additional and advanced settings writing and querying data, serialization, streaming using change feed, partitioning and throughput management and more. For a complete listing with details see our Spark Connector Configuration Reference on GitHub.

Next steps