Spark 上的 Azure Cosmos DB Cassandra API 表聚合操作Aggregate operations on Azure Cosmos DB Cassandra API tables from Spark

本文介绍了在 Spark 上对 Azure Cosmos DB Cassandra API 表进行的基本聚合操作。This article describes basic aggregation operations against Azure Cosmos DB Cassandra API tables from Spark.

备注

Azure Cosmos DB Cassandra API 当前不支持服务器端筛选和服务器端聚合。Server-side filtering, and server-side aggregation is currently not supported in Azure Cosmos DB Cassandra API.

Cassandra API 配置Cassandra API configuration

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmos.azure.cn")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

示例数据生成器Sample data generator

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

计数操作Count operation

RDD APIRDD API

sc.cassandraTable("books_ks", "books").count

输出:Output:

res48: Long = 5

数据帧 APIDataframe API

目前不支持针对数据帧进行计数。Count against dataframes is currently not supported. 下面的示例显示了在将数据帧作为工作区保存到内存后如何执行数据帧计数。The sample below shows how to execute a dataframe count after persisting the dataframe to memory as a workaround.

从下列可用选项中选择存储选项,避免发生“内存不足”的问题:Choose a storage option from the following available options, to avoid running into "out of memory" issues:

  • MEMORY_ONLY:这是默认的存储选项。MEMORY_ONLY: This is the default storage option. 将 RDD 作为反序列化的 Java 对象存储到 JVM 中。Stores RDD as deserialized Java objects in the JVM. 如果 RDD 不可存储到内存中,则不缓存某些分区,且在每次需要时动态计算它们。If the RDD does not fit in memory, some partitions will not be cached and they are recomputed on the fly each time they're needed.

  • MEMORY_AND_DISK:将 RDD 作为反序列化的 Java 对象存储到 JVM 中。MEMORY_AND_DISK: Stores RDD as deserialized Java objects in the JVM. 如果 RDD 不可存储到内存中,请对不可放到磁盘上的分区进行存储,并在每次需要时从所存储的位置进行读取。If the RDD does not fit in memory, store the partitions that don't fit on disk, and whenever required, read them from the location they are stored.

  • MEMORY_ONLY_SER (Java/Scala):将 RDD 作为序列化的 Java 对象进行存储;每个分区一个字节数组。MEMORY_ONLY_SER (Java/Scala): Stores RDD as serialized Java objects- one-byte array per partition. 与反序列化的对象相比,此选项可节省空间,尤其是在使用快速序列化程序时,但读取时所占用的 CPU 更多。This option is space-efficient when compared to deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

  • MEMORY_AND_DISK_SER (Java/Scala):此存储选项与 MEMORY_ONLY_SER 类似,唯一区别是该选项会溢出不适合磁盘内存的分区,而不是在需要时重新计算它们。MEMORY_AND_DISK_SER (Java/Scala): This storage option is like MEMORY_ONLY_SER, the only difference is that it spills partitions that don't fit in the disk memory instead of recomputing them when they're needed.

  • DISK_ONLY:仅将 RDD 分区存储在磁盘上。DISK_ONLY: Stores the RDD partitions on the disk only.

  • MEMORY_ONLY_2、MEMORY_AND_DISK_2...:与上述级别相同,但复制两个群集节点上的每个分区。MEMORY_ONLY_2, MEMORY_AND_DISK_2…: Same as the levels above, but replicates each partition on two cluster nodes.

  • OFF_HEAP(实验性):与 MEMORY_ONLY_SER 类似,但它将数据存储在堆外内存中,并且需要提前启用堆外内存。OFF_HEAP (experimental): Similar to MEMORY_ONLY_SER, but it stores the data in off-heap memory, and it requires off-heap memory to be enabled ahead of time.

    //Workaround
    import org.apache.spark.storage.StorageLevel
    
    //Read from source
    val readBooksDF = spark
      .read
      .cassandraFormat("books", "books_ks", "")
      .load()
    
    //Explain plan
    readBooksDF.explain
    
    //Materialize the dataframe
    readBooksDF.persist(StorageLevel.MEMORY_ONLY)
    
    //Subsequent execution against this DF hits the cache 
    readBooksDF.count
    
    //Persist as temporary view
    readBooksDF.createOrReplaceTempView("books_vw")
    

SQLSQL

select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

平均操作数Average operation

RDD APIRDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

输出:Output:

res24: Double = 16.016000175476073

数据帧 APIDataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

输出:Output:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQLSQL

select avg(book_price) from books_vw;

输出:Output:

16.016000175476073

最小操作数Min operation

RDD APIRDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

输出:Output:

res31: Float = 11.33

数据帧 APIDataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

输出:Output:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQLSQL

select min(book_price) from books_vw;

输出:Output:

11.33

最大操作数Max operation

RDD APIRDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

数据帧 APIDataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

输出:Output:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQLSQL

select max(book_price) from books_vw;

输出:Output:

22.45

操作总数Sum operation

RDD APIRDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

输出:Output:

res46: Double = 80.08000087738037

数据帧 APIDataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

输出:Output:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQLSQL

select sum(book_price) from books_vw;

输出:Output:

80.08000087738037

重要或类似操作Top or comparable operation

RDD APIRDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

输出:Output:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

数据帧 APIDataframe API

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

输出:Output:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQLSQL

select book_name,book_price from books_vw order by book_price desc limit 3;

后续步骤Next steps

要执行表复制操作,请参阅:To perform table copy operations, see: