在 Spark 的 Azure Cosmos DB for Apache Cassandra 中执行的 DDL 操作

适用对象: Cassandra

本文详细介绍了针对 Spark 上的 Azure Cosmos DB for Apache Cassandra 的密钥空间和表 DDL 操作。

Spark 上下文

API for Cassandra 的连接器要求将 Cassandra 连接的详细信息作为 spark 上下文的一部分进行初始化。 当你启动笔记本时,Spark 上下文已初始化。建议你不要停止并重新初始化它。 一种解决方案是在群集 spark 配置中添加群集级别的 API for Cassandra 实例配置。 这是每个群集的一次性活动。 将以下代码添加到 Spark 配置,作为空格分隔的键值对:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.cn
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

注意

如果使用的是 Spark 3.x,则无需安装 Azure Cosmos DB 帮助程序和连接工厂。 对于 Spark 3 连接器,还应该使用 remoteConnectionsPerExecutor 而不是 connections_per_executor_max(见上文)。

警告

本文展示的 Spark 3 示例已使用 Spark 3.2.1 版本和相应的 Cassandra Spark 连接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 测试过。 更高版本的 Spark 和/或 Cassandra 连接器可能无法按预期运行。

密钥空间 DDL 操作

创建密钥空间

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

在 cqlsh 中验证

在 cqlsh 中运行以下命令,可看到先前创建的密钥空间。

DESCRIBE keyspaces;

删除密钥空间

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

在 cqlsh 中验证

DESCRIBE keyspaces;

表 DDL 操作

注意事项:

  • 可使用 create table 语句在表级别分配吞吐量。
  • 一个分区键可存储 20 GB 的数据。
  • 一条记录最多可存储 2 MB 的数据。
  • 一个分区键范围可存储多个分区键。

创建表

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

在 cqlsh 中验证

在 cqlsh 中运行以下命令,应看到名为“books”的表:

USE books_ks;
DESCRIBE books;

预配的吞吐量和默认 TTL 值未显示在上一个命令的输出中,可从门户获取这些值。

更改表

可使用 alter table 命令更改以下值:

  • 预配的吞吐量
  • 生存时间值
    目前不支持更改列。
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

删除表

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

在 cqlsh 中验证

在 cqlsh 中运行以下命令,可看到“books”表不再可用:

USE books_ks;
DESCRIBE tables;

后续步骤

创建密钥空间和表后,请继续阅读以下有关 CRUD 操作的文章: