通过 Spark 创建/插入数据到 Azure Cosmos DB for Apache Cassandra
适用对象: Cassandra
本文介绍如何通过 Spark 将示例数据插入 Azure Cosmos DB for Apache Cassandra 的表中。
API for Cassandra 配置
在笔记本群集中设置以下 spark 配置。 这是一次性活动。
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.cn
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
注意
如果使用的是 Spark 3.x,则无需安装 Azure Cosmos DB 帮助程序和连接工厂。 对于 Spark 3 连接器,还应该使用 remoteConnectionsPerExecutor
而不是 connections_per_executor_max
(见上文)。
警告
本文展示的 Spark 3 示例已使用 Spark 3.2.1 版本和相应的 Cassandra Spark 连接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 测试过。 更高版本的 Spark 和/或 Cassandra 连接器可能无法按预期运行。
数据帧 API
使用示例数据创建数据帧
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a dataframe containing five records
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
//Review schema
booksDF.printSchema
//Print
booksDF.show
注意
尚不支持在行级别实现“不存在时创建”功能。
保存至 Azure Cosmos DB for Apache Cassandra
保存数据时,还可以设置生存时间和一致性策略设置,如以下示例所示:
//Persist
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
在 cqlsh 中验证
use books_ks;
select * from books;
可复原分布式数据库 (RDD) API
使用示例数据创建 RDD
//Drop and re-create table to delete records created in the previous section
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
//Create RDD
val booksRDD = sc.parallelize(Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))
//Review
booksRDD.take(2).foreach(println)
注意
尚不支持“不存在时创建”功能。
保存至 Azure Cosmos DB for Apache Cassandra
将数据保存到 API for Cassandra 时,还可以设置生存时间和一致性策略设置,如以下示例所示:
import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel
//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))
在 cqlsh 中验证
use books_ks;
select * from books;
后续步骤
将数据插入 Azure Cosmos DB for Apache Cassandra 表后,请继续阅读以下文章,对 Azure Cosmos DB for Apache Cassandra 中存储的数据执行其他操作: