在 Spark 上对 Azure Cosmos DB for Apache Cassandra 表进行的聚合操作

适用对象: Cassandra

本文介绍在 Spark 上对 Azure Cosmos DB for Apache Cassandra 表进行的基本聚合操作。

注意

Azure Cosmos DB for Apache Cassandra 当前不支持服务器端筛选和服务器端聚合。

API for Cassandra 配置

在笔记本群集中设置以下 spark 配置。 这是一次性活动。

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.cn  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

注意

如果使用的是 Spark 3.x,则无需安装 Azure Cosmos DB 帮助程序和连接工厂。 对于 Spark 3 连接器,还应该使用 remoteConnectionsPerExecutor 而不是 connections_per_executor_max(见上文)。

警告

本文展示的 Spark 3 示例已使用 Spark 3.2.1 版本和相应的 Cassandra Spark 连接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 测试过。 更高版本的 Spark 和/或 Cassandra 连接器可能无法按预期运行。

示例数据生成器

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

计数操作

RDD API

sc.cassandraTable("books_ks", "books").count

输出:

count: Long = 5

数据帧 API

目前不支持针对数据帧进行计数。 下面的示例显示了在将数据帧作为工作区保存到内存后如何执行数据帧计数。

从下列可用选项中选择存储选项,避免发生“内存不足”的问题:

  • MEMORY_ONLY:这是默认的存储选项。 将 RDD 作为反序列化的 Java 对象存储到 JVM 中。 如果 RDD 不可存储到内存中,则不会缓存某些分区,且在每次需要时动态计算它们。

  • MEMORY_AND_DISK:将 RDD 作为反序列化的 Java 对象存储到 JVM 中。 如果 RDD 不可存储到内存中,请对不可放到磁盘上的分区进行存储,并在每次需要时从所存储的位置进行读取。

  • MEMORY_ONLY_SER (Java/Scala):将 RDD 作为序列化的 Java 对象进行存储;每个分区一个字节数组。 与反序列化的对象相比,此选项可节省空间,尤其是在使用快速序列化程序时,但读取时所占用的 CPU 更多。

  • MEMORY_AND_DISK_SER (Java/Scala):此存储选项与 MEMORY_ONLY_SER 类似,唯一区别是该选项会溢出不适合磁盘内存的分区,而不是在需要时重新计算它们。

  • DISK_ONLY:仅将 RDD 分区存储在磁盘上。

  • MEMORY_ONLY_2、MEMORY_AND_DISK_2...:与上述级别相同,但复制两个群集节点上的每个分区。

  • OFF_HEAP(实验性):与 MEMORY_ONLY_SER 类似,但它将数据存储在堆外内存中,并且需要提前启用堆外内存。

    //Workaround
    import org.apache.spark.storage.StorageLevel
    
    //Read from source
    val readBooksDF = spark
      .read
      .cassandraFormat("books", "books_ks", "")
      .load()
    
    //Explain plan
    readBooksDF.explain
    
    //Materialize the dataframe
    readBooksDF.persist(StorageLevel.MEMORY_ONLY)
    
    //Subsequent execution against this DF hits the cache 
    readBooksDF.count
    
    //Persist as temporary view
    readBooksDF.createOrReplaceTempView("books_vw")
    

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

平均操作数

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

输出:

res24: Double = 16.016000175476073

数据帧 API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

输出:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQL

select avg(book_price) from books_vw;

输出:

16.016000175476073

最小操作数

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

输出:

res31: Float = 11.33

数据帧 API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

输出:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQL

%sql
select avg(book_price) from books_vw;

输出:

11.33

最大操作数

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

数据帧 API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

输出:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQL

%sql
select max(book_price) from books_vw;

输出:

22.45

操作总数

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

输出:

res46: Double = 80.08000087738037

数据帧 API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

输出:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQL

select sum(book_price) from books_vw;

输出:

80.08000087738037

重要或类似操作

RDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

输出:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

数据帧 API

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

输出:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

后续步骤

要执行表复制操作,请参阅: