Compartilhar via

通过 HDInsight 从 YARN 上的 Spark 访问 Azure Cosmos DB for Apache Cassandra

适用对象: 卡珊德拉

重要

你是否正在寻找一种数据库解决方案,以应对需要高扩展性、99.999% 可用性服务级别协议(SLA)、即时自动扩展和跨多个区域的自动故障转移的场景? 请考虑使用 Azure Cosmos DB for NoSQL

本文介绍如何通过 HDInsight-Spark 从运行在 YARN 上的 Spark 访问来自 spark-shell 的适用于 Apache Cassandra 的 Azure Cosmos DB。 HDInsight 是 Azure 上的 Hortonworks Hadoop PaaS。 它为 HDFS 使用对象存储,并采用多种风格,其中包括 Spark。 虽然本文提到了 HDInsight-Spark,但它适用于所有 Hadoop 发行版。

先决条件

在开始之前,请回顾连接到 Azure Cosmos DB for Apache Cassandra 的基础知识

需要具备以下先决条件:

  • 预配 Azure Cosmos DB for Apache Cassandra。 请参阅创建数据库帐户

  • 预配 HDInsight Spark 群集。 请参阅使用 ARM 模板在 Azure HDInsight 中创建 Apache Spark 群集

  • 用于 Cassandra 配置的 Spark2 API。 Cassandra 的 Spark 连接器要求将 Cassandra 连接详细信息作为 Spark 上下文的一部分初始化。 当你启动 Jupyter 笔记本时,Spark 会话和上下文已初始化。 除非已完成设置为 HDInsight 默认 Jupyter 笔记本启动项的一部分的每个配置,否则请不要停止和重新初始化 Spark 上下文。 解决办法之一是将 Cassandra 实例详细信息直接添加到 Ambari 中的 Spark2 服务配置。 此方法是针对每个需要重启 Spark2 服务的群集的一次性活动。

    1. 依次转到 Ambari 和 Spark2 服务,然后选择“configs”。

    2. 转到自定义 spark2-defaults 并添加包含以下内容的新属性,然后重启 Spark2 服务:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.cn<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

可以使用 cqlsh 进行验证。 有关详细信息,请参阅从 Spark 连接到 Azure Cosmos DB for Apache Cassandra

通过 Spark Shell 访问 Azure Cosmos DB for Apache Cassandra

Spark shell 用于测试和探索。

  • 启动 spark-shell,其中包含与群集的 Spark 版本兼容的所需 maven 依赖项。

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • 执行一些 DDL 和 DML 操作

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • 运行 CRUD 操作

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

从 Jupyter Notebook 访问 Azure Cosmos DB for Apache Cassandra

HDInsight-Spark 附带有 Zeppelin 和 Jupyter 笔记本服务。 二者均为支持 Scala 和 Python 的基于 Web 的笔记本环境。 笔记本非常适合用于交互式探索性分析和协作,但不适用于运营或生产流程。

以下 Jupyter Notebook 可以上传到 HDInsight Spark 群集,并提供使用 Azure Cosmos DB for Apache Cassandra 的现成示例。 请务必查看第一个笔记本 1.0-ReadMe.ipynb,以查看用于连接到 Azure Cosmos DB for Apache Cassandra 的 Spark 服务配置。

azure-cosmos-db-cassandra-api-spark-notebooks-jupyter 下的笔记本下载到机器。

如何上传

启动 Jupyter 时,导航到 Scala。 创建一个目录,然后将笔记本上传到该目录。 “上传”按钮位于右上方。

如何运行

按顺序运行每个笔记本及其包含的单元。 选择每个笔记本顶部的“运行”按钮来运行所有单元格,或按 “Shift” “Enter” 键来运行每个单元格。

使用 Spark Scala 程序访问 Azure Cosmos DB for Apache Cassandra

对于生产环境中的自动化流程,需使用 spark-submit 将 Spark 程序提交到群集。

后续步骤