在 Spark 上对 Azure Cosmos DB for Apache Cassandra 表进行的聚合操作
适用对象: Cassandra
本文介绍在 Spark 上对 Azure Cosmos DB for Apache Cassandra 表进行的基本聚合操作。
注意
Azure Cosmos DB for Apache Cassandra 当前不支持服务器端筛选和服务器端聚合。
API for Cassandra 配置
在笔记本群集中设置以下 spark 配置。 这是一次性活动。
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.cn
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
注意
如果使用的是 Spark 3.x,则无需安装 Azure Cosmos DB 帮助程序和连接工厂。 对于 Spark 3 连接器,还应该使用 remoteConnectionsPerExecutor
而不是 connections_per_executor_max
(见上文)。
警告
本文展示的 Spark 3 示例已使用 Spark 3.2.1 版本和相应的 Cassandra Spark 连接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 测试过。 更高版本的 Spark 和/或 Cassandra 连接器可能无法按预期运行。
示例数据生成器
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
计数操作
RDD API
sc.cassandraTable("books_ks", "books").count
输出:
count: Long = 5
数据帧 API
目前不支持针对数据帧进行计数。 下面的示例显示了在将数据帧作为工作区保存到内存后如何执行数据帧计数。
从下列可用选项中选择存储选项,避免发生“内存不足”的问题:
MEMORY_ONLY:这是默认的存储选项。 将 RDD 作为反序列化的 Java 对象存储到 JVM 中。 如果 RDD 不可存储到内存中,则不会缓存某些分区,且在每次需要时动态计算它们。
MEMORY_AND_DISK:将 RDD 作为反序列化的 Java 对象存储到 JVM 中。 如果 RDD 不可存储到内存中,请对不可放到磁盘上的分区进行存储,并在每次需要时从所存储的位置进行读取。
MEMORY_ONLY_SER (Java/Scala):将 RDD 作为序列化的 Java 对象进行存储;每个分区一个字节数组。 与反序列化的对象相比,此选项可节省空间,尤其是在使用快速序列化程序时,但读取时所占用的 CPU 更多。
MEMORY_AND_DISK_SER (Java/Scala):此存储选项与 MEMORY_ONLY_SER 类似,唯一区别是该选项会溢出不适合磁盘内存的分区,而不是在需要时重新计算它们。
DISK_ONLY:仅将 RDD 分区存储在磁盘上。
MEMORY_ONLY_2、MEMORY_AND_DISK_2...:与上述级别相同,但复制两个群集节点上的每个分区。
OFF_HEAP(实验性):与 MEMORY_ONLY_SER 类似,但它将数据存储在堆外内存中,并且需要提前启用堆外内存。
//Workaround import org.apache.spark.storage.StorageLevel //Read from source val readBooksDF = spark .read .cassandraFormat("books", "books_ks", "") .load() //Explain plan readBooksDF.explain //Materialize the dataframe readBooksDF.persist(StorageLevel.MEMORY_ONLY) //Subsequent execution against this DF hits the cache readBooksDF.count //Persist as temporary view readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
平均操作数
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
输出:
res24: Double = 16.016000175476073
数据帧 API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
输出:
+------------------+
| avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+
SQL
select avg(book_price) from books_vw;
输出:
16.016000175476073
最小操作数
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
输出:
res31: Float = 11.33
数据帧 API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
输出:
+---------------+
|min(book_price)|
+---------------+
| 11.33|
+---------------+
SQL
%sql
select avg(book_price) from books_vw;
输出:
11.33
最大操作数
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
数据帧 API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
输出:
+---------------+
|max(book_price)|
+---------------+
| 22.45|
+---------------+
SQL
%sql
select max(book_price) from books_vw;
输出:
22.45
操作总数
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
输出:
res46: Double = 80.08000087738037
数据帧 API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
输出:
+-----------------+
| sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+
SQL
select sum(book_price) from books_vw;
输出:
80.08000087738037
重要或类似操作
RDD API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
输出:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
数据帧 API
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
输出:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name|book_price|
+--------------------+----------+
| A sign of four| 22.45|
|The adventures of...| 19.83|
|The memoirs of Sh...| 14.22|
+--------------------+----------+
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;
后续步骤
要执行表复制操作,请参阅: