使用 Spark 读取 zure Cosmos DB for Apache Cassandra 表中的数据

适用对象: Cassandra

本文介绍如何从 Spark 读取存储在 Azure Cosmos DB for Apache Cassandra 中的数据。

API for Cassandra 配置

在笔记本群集中设置以下 spark 配置。 这是一次性活动。

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.cn  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

注意

如果使用的是 Spark 3.x,则无需安装 Azure Cosmos DB 帮助程序和连接工厂。 对于 Spark 3 连接器,还应该使用 remoteConnectionsPerExecutor 而不是 connections_per_executor_max(见上文)。

警告

本文展示的 Spark 3 示例已使用 Spark 3.2.1 版本和相应的 Cassandra Spark 连接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 测试过。 更高版本的 Spark 和/或 Cassandra 连接器可能无法按预期运行。

数据帧 API

使用 session.read.format 命令读取表

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

使用 spark.read.cassandraFormat 读取表

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

读取表中特定的列

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

应用筛选器

可以将谓词向下推送到数据库,以便更好地优化 Spark 查询。 谓词是返回 true 或 false 的查询的条件,通常位于 WHERE 子句中。 谓词向下推送会筛选数据库查询中的数据,减少从数据库中检索到的条目数,提高查询性能。 默认情况下,Spark 数据集 API 会自动将有效的 WHERE 子句向下推送到数据库。

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

物理计划的 Cassandra Filters 部分包括向下推送的筛选器。

partitions

RDD API

读取表

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

读取表中特定的列

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

SQL 视图

从 dataframe 创建临时视图

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

针对视图运行查询

select * from books_vw where book_pub_year > 1891

后续步骤

以下是有关从 Spark 使用 Azure Cosmos DB for Apache Cassandra 的其他文章: