通过 HDInsight 从 YARN 上的 Spark 访问 Azure Cosmos DB for Apache Cassandra

适用对象: Cassandra

本文介绍如何通过 spark-shell 中的 HDInsight-Spark 从 YARN 上的 Spark 访问 Azure Cosmos DB for Apache Cassandra。 HDInsight 是 Azure 上的 Hortonworks Hadoop PaaS。 它为 HDFS 使用对象存储,并采用多种风格,其中包括 Spark。 虽然本文提到了 HDInsight-Spark,但它适用于所有 Hadoop 发行版。

先决条件

在开始之前,请回顾连接到 Azure Cosmos DB for Apache Cassandra 的基础知识

需要具备以下先决条件:

  • 预配 Azure Cosmos DB for Apache Cassandra。 请参阅创建数据库帐户

  • 预配 HDInsight-Spark 群集。 请参阅使用 ARM 模板在 Azure HDInsight 中创建 Apache Spark 群集

  • Spark2 中的 API for Cassandra 配置。 Cassandra 的 Spark 连接器要求将 Cassandra 连接详细信息作为 Spark 上下文的一部分初始化。 当你启动 Jupyter 笔记本时,Spark 会话和上下文已初始化。 除非已完成设置为 HDInsight 默认 Jupyter 笔记本启动项的一部分的每个配置,否则请不要停止和重新初始化 Spark 上下文。 解决办法之一是将 Cassandra 实例详细信息直接添加到 Ambari 中的 Spark2 服务配置。 此方法是针对每个需要重启 Spark2 服务的群集的一次性活动。

    1. 依次转到 Ambari 和 Spark2 服务,然后选择“configs”。

    2. 转到自定义 spark2-defaults 并添加包含以下内容的新属性,然后重启 Spark2 服务:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.cn<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

可以使用 cqlsh 进行验证。 有关详细信息,请参阅从 Spark 连接到 Azure Cosmos DB for Apache Cassandra

通过 Spark Shell 访问 Azure Cosmos DB for Apache Cassandra

Spark shell 用于测试和探索。

  • 启动 spark-shell,其中包含与群集的 Spark 版本兼容的所需 maven 依赖项。

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • 执行一些 DDL 和 DML 操作

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • 运行 CRUD 操作

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

从 Jupyter Notebook 访问 Azure Cosmos DB for Apache Cassandra

HDInsight Spark 附带有 Zeppelin 和 Jupyter Notebook 服务。 二者均为支持 Scala 和 Python 的基于 Web 的笔记本环境。 笔记本非常适合用于交互式探索性分析和协作,但不适用于运营或生产流程。

以下 Jupyter Notebook 可以上传到 HDInsight Spark 群集,并提供使用 Azure Cosmos DB for Apache Cassandra 的现成示例。 请务必查看第一个笔记本 1.0-ReadMe.ipynb,以查看用于连接到 Azure Cosmos DB for Apache Cassandra 的 Spark 服务配置。

azure-cosmos-db-cassandra-api-spark-notebooks-jupyter 下的笔记本下载到计算机。

如何上传

启动 Jupyter 时,导航到 Scala。 创建一个目录,然后将笔记本上传到该目录。 “上传”按钮位于右上方。

如何运行

按顺序运行每个笔记本及其包含的单元。 选择每个笔记本顶部的“运行”按钮来运行所有单元,或按 Shift+Enter 运行每个单元。

从 Spark Scala 程序访问 Azure Cosmos DB for Apache Cassandra

对于生产环境中的自动化流程,需使用 spark-submit 将 Spark 程序提交到群集。

后续步骤