使用 Spark 读取 zure Cosmos DB for Apache Cassandra 表中的数据
适用对象: Cassandra
本文介绍如何从 Spark 读取存储在 Azure Cosmos DB for Apache Cassandra 中的数据。
API for Cassandra 配置
在笔记本群集中设置以下 spark 配置。 这是一次性活动。
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.cn
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
注意
如果使用的是 Spark 3.x,则无需安装 Azure Cosmos DB 帮助程序和连接工厂。 对于 Spark 3 连接器,还应该使用 remoteConnectionsPerExecutor
而不是 connections_per_executor_max
(见上文)。
警告
本文展示的 Spark 3 示例已使用 Spark 3.2.1 版本和相应的 Cassandra Spark 连接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 测试过。 更高版本的 Spark 和/或 Cassandra 连接器可能无法按预期运行。
数据帧 API
使用 session.read.format 命令读取表
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
使用 spark.read.cassandraFormat 读取表
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
读取表中特定的列
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
应用筛选器
可以将谓词向下推送到数据库,以便更好地优化 Spark 查询。 谓词是返回 true 或 false 的查询的条件,通常位于 WHERE 子句中。 谓词向下推送会筛选数据库查询中的数据,减少从数据库中检索到的条目数,提高查询性能。 默认情况下,Spark 数据集 API 会自动将有效的 WHERE 子句向下推送到数据库。
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
物理计划的 Cassandra Filters
部分包括向下推送的筛选器。
RDD API
读取表
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
读取表中特定的列
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
SQL 视图
从 dataframe 创建临时视图
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
针对视图运行查询
select * from books_vw where book_pub_year > 1891
后续步骤
以下是有关从 Spark 使用 Azure Cosmos DB for Apache Cassandra 的其他文章: