使用 Spark 读取 Azure Cosmos DB Cassandra API 表中的数据Read data from Azure Cosmos DB Cassandra API tables using Spark

适用于: Cassandra API

本文介绍如何从 Spark 读取存储在 Azure Cosmos DB Cassandra API 中的数据。This article describes how to read data stored in Azure Cosmos DB Cassandra API from Spark.

Cassandra API 配置Cassandra API configuration

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmos.azure.cn")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

数据帧 APIDataframe API

使用 session.read.format 命令读取表Read table using session.read.format command

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

使用 spark.read.cassandraFormat 读取表Read table using spark.read.cassandraFormat

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

读取表中特定的列Read specific columns in table

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

应用筛选器Apply filters

可以将谓词向下推送到数据库,以便更好地优化 Spark 查询。You can push down predicates to the database to allow for better optimized Spark queries. 谓词是返回 true 或 false 的查询的条件,通常位于 WHERE 子句中。A predicate is a condition on a query that returns true or false, typically located in the WHERE clause. 谓词向下推送会筛选数据库查询中的数据,减少从数据库中检索到的条目数,提高查询性能。A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance. 默认情况下,Spark 数据集 API 会自动将有效的 WHERE 子句向下推送到数据库。By default the Spark Dataset API will automatically push down valid WHERE clauses to the database.

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

物理计划的 PushedFilters 节包括 GreaterThan 向下推送筛选器。The PushedFilters section of the physical plan includes the GreaterThan push down filter.

分区

RDD APIRDD API

读取表Read table

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

读取表中特定的列Read specific columns in table

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

SQL 视图SQL Views

从 dataframe 创建临时视图Create a temporary view from a dataframe

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

针对视图运行查询Run queries against the view

select * from books_vw where book_pub_year > 1891

后续步骤Next steps

以下是有关从 Spark 使用 Azure Cosmos DB Cassandra API 的其他文章:The following are additional articles on working with Azure Cosmos DB Cassandra API from Spark: