通过 Spark 创建/插入数据到 Azure Cosmos DB Cassandra APICreate/Insert data into Azure Cosmos DB Cassandra API from Spark

本文介绍如何通过 Spark 将示例数据插入 Azure Cosmos DB Cassandra API 的表中。This article describes how to insert sample data into a table in Azure Cosmos DB Cassandra API from Spark.

Cassandra API 配置Cassandra API configuration

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.cn")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

数据帧 APIDataframe API

使用示例数据创建数据帧Create a Dataframe with sample data

// Generate a dataframe containing five records
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")

//Review schema
booksDF.printSchema

//Print
booksDF.show

Note

尚不支持在行级别实现“不存在时创建”功能。"Create if not exists" functionality, at a row level, is not yet supported.

保存到 Azure Cosmos DB Cassandra APIPersist to Azure Cosmos DB Cassandra API

保存数据时,还可以设置生存时间和一致性策略设置,如以下示例所示:When saving data, you can also set time-to-live and consistency policy settings as shown in the following example:

//Persist
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Note

尚不支持列级 TTL。Column-level TTL is not supported yet.

在 cqlsh 中验证Validate in cqlsh

use books_ks;
select * from books;

可复原分布式数据库 (RDD) APIResilient Distributed Database (RDD) API

使用示例数据创建 RDDCreate a RDD with sample data

//Delete records created in the previous section 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("delete from books_ks.books where book_id in ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999');"))

//Create RDD
val booksRDD = sc.parallelize(Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))

//Review
booksRDD.take(2).foreach(println)

Note

尚不支持“不存在时创建”功能。Create if not exists functionality is not yet supported.

保存到 Azure Cosmos DB Cassandra APIPersist to Azure Cosmos DB Cassandra API

将数据保存到 Cassandra API 时,还可以设置生存时间和一致性策略设置,如以下示例所示:When saving data to Cassandra API, you can also set time-to-live and consistency policy settings as shown in the following example:

import com.datastax.spark.connector.writer._

//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))

在 cqlsh 中验证Validate in cqlsh

use books_ks;
select * from books;

后续步骤Next steps

将数据插入 Azure Cosmos DB Cassandra API 表后,请继续阅读以下文章,对 Cosmos DB Cassandra API 中存储的数据执行其他操作:After inserting data into the Azure Cosmos DB Cassandra API table, proceed to the following articles to perform other operations on the data stored in Cosmos DB Cassandra API: