从 Spark 连接到 Azure Cosmos DB Cassandra APIConnect to Azure Cosmos DB Cassandra API from Spark

本文是有关 Spark 中 Azure Cosmos DB Cassandra API 集成的系列文章中的一篇。This article is one among a series of articles on Azure Cosmos DB Cassandra API integration from Spark. 这些文章介绍了连接、数据定义语言 (DDL) 操作、基本数据操作语言 (DML) 操作,以及 Spark 中的高级 Azure Cosmos DB Cassandra API 集成。The articles cover connectivity, Data Definition Language(DDL) operations, basic Data Manipulation Language(DML) operations, and advanced Azure Cosmos DB Cassandra API integration from Spark.

先决条件Prerequisites

连接的依赖项Dependencies for connectivity

  • Cassandra 的 Spark 连接器: Spark 连接器用于连接到 Azure Cosmos DB Cassandra API。Spark connector for Cassandra: Spark connector is used to connect to Azure Cosmos DB Cassandra API. 请识别并使用 Maven 中心内与 Spark 环境的 Spark 和 Scala 版本兼容的连接器版本。Identify and use the version of the connector located in Maven central that is compatible with the Spark and Scala versions of your Spark environment.

  • Cassandra API 的 Azure Cosmos DB 帮助器库: 除了 Spark 连接器以外,还需要 Azure Cosmos DB 中名为 azure-cosmos-cassandra-spark-helper 的另一个库。Azure Cosmos DB helper library for Cassandra API: In addition to the Spark connector, you need another library called azure-cosmos-cassandra-spark-helper from Azure Cosmos DB. 此库包含自定义连接工厂和重试策略类。This library contains custom connection factory and retry policy classes.

    Azure Cosmos DB 中的重试策略配置为处理 HTTP 状态代码 429(“请求速率太大”)异常。The retry policy in Azure Cosmos DB is configured to handle HTTP status code 429("Request Rate Large") exceptions. Azure Cosmos DB Cassandra API 在 Cassandra 本机协议中将这些异常解释为过载错误,你可以结合退让进行重试。The Azure Cosmos DB Cassandra API translates these exceptions into overloaded errors on the Cassandra native protocol, and you can retry with back-offs. 由于 Azure Cosmos DB 使用预配的吞吐量模型,当传入/传出速率增大时,会发生请求速率限制异常。Because Azure Cosmos DB uses provisioned throughput model, request rate limiting exceptions occur when the ingress/egress rates increase. 重试策略可以防范 Spark 作业出现数据高峰(短暂性地超过为容器分配的吞吐量)。The retry policy protects your spark jobs against data spikes that momentarily exceed the throughput allocated for your container.

    备注

    重试策略只能防范 Spark 作业出现短暂的高峰。The retry policy can protect your spark jobs against momentary spikes only. 如果尚未配置用于运行工作负荷的足够 RU,则重试策略不适用,并且重试策略类会再次引发异常。If you have not configured enough RUs required to run your workload, then the retry policy is not applicable and the retry policy class rethrows the exception.

  • Azure Cosmos DB 帐户连接详细信息: Azure Cassandra API 帐户名称、帐户终结点和密钥。Azure Cosmos DB account connection details: Your Azure Cassandra API account name, account endpoint, and key.

Spark 连接器吞吐量配置参数Spark connector throughput configuration parameters

下表列出了连接器提供的特定于 Azure Cosmos DB Cassandra API 的吞吐量配置参数。The following table lists Azure Cosmos DB Cassandra API-specific throughput configuration parameters provided by the connector. 有关所有配置参数的详细列表,请参阅 Spark Cassandra 连接器 GitHub 存储库的配置参考页。For a detailed list of all configuration parameters, see configuration reference page of the Spark Cassandra Connector GitHub repository.

属性名称Property Name 默认值Default value 说明Description
spark.cassandra.output.batch.size.rowsspark.cassandra.output.batch.size.rows 11 每个批的行数。Number of rows per single batch. 请将此参数设置为 1。Set this parameter to 1. 此参数用于提高重型工作负荷的吞吐量。This parameter is used to achieve higher throughput for heavy workloads.
spark.cassandra.connection.connections_per_executor_maxspark.cassandra.connection.connections_per_executor_max None 每个执行器的每个节点的最大连接数。Maximum number of connections per node per executor. 10*n 相当于 n 节点 Cassandra 群集中每个节点可以建立 10 个连接。10*n is equivalent to 10 connections per node in an n-node Cassandra cluster. 因此,如果需要为 5 节点 Cassandra 群集的每个执行器的每个节点建立 5 个连接,则应将此配置设置为 25。So, if you require 5 connections per node per executor for a 5 node Cassandra cluster, then you should set this configuration to 25. 请根据为 Spark 作业配置的并行度或执行器数目修改此值。Modify this value based on the degree of parallelism or the number of executors that your spark jobs are configured for.
spark.cassandra.output.concurrent.writesspark.cassandra.output.concurrent.writes 100100 定义每个执行器可以执行的并行写入数。Defines the number of parallel writes that can occur per executor. 由于“batch.size.rows”设置为 1,因此请务必相应地增大此值。Because you set "batch.size.rows" to 1, make sure to scale up this value accordingly. 请根据工作负荷要实现的并行度或吞吐量修改此值。Modify this value based on the degree of parallelism or the throughput that you want to achieve for your workload.
spark.cassandra.concurrent.readsspark.cassandra.concurrent.reads 512512 定义每个执行器可以执行的并行读取数。Defines the number of parallel reads that can occur per executor. 请根据工作负荷要实现的并行度或吞吐量修改此值Modify this value based on the degree of parallelism or the throughput that you want to achieve for your workload
spark.cassandra.output.throughput_mb_per_secspark.cassandra.output.throughput_mb_per_sec None 定义每个执行器的总写入吞吐量。Defines the total write throughput per executor. 可将此参数用作 Spark 作业吞吐量的上限,并根据 Cosmos 容器的预配吞吐量修改此参数。This parameter can be used as an upper limit for your spark job throughput, and base it on the provisioned throughput of your Cosmos container.
spark.cassandra.input.reads_per_secspark.cassandra.input.reads_per_sec None 定义每个执行器的总读取吞吐量。Defines the total read throughput per executor. 可将此参数用作 Spark 作业吞吐量的上限,并根据 Cosmos 容器的预配吞吐量修改此参数。This parameter can be used as an upper limit for your spark job throughput, and base it on the provisioned throughput of your Cosmos container.
spark.cassandra.output.batch.grouping.buffer.sizespark.cassandra.output.batch.grouping.buffer.size 10001000 定义每个 Spark 任务的、在发送到 Cassandra API 之前可以存储在内存中的批数Defines the number of batches per single spark task that can be stored in memory before sending to Cassandra API
spark.cassandra.connection.keep_alive_msspark.cassandra.connection.keep_alive_ms 6000060000 定义在经过多长的时间之后未使用的连接可供使用。Defines the period of time until which unused connections are available.

请根据 Spark 作业的预期工作负荷,以及为 Cosmos DB 帐户预配的吞吐量,来调整这些参数的吞吐量和并行度。Adjust the throughput and degree of parallelism of these parameters based on the workload you expect for your spark jobs, and the throughput you have provisioned for your Cosmos DB account.

从 Spark 连接到 Azure Cosmos DB Cassandra APIConnecting to Azure Cosmos DB Cassandra API from Spark

cqlshcqlsh

以下命令详细说明如何从 cqlsh 连接到 Azure CosmosDB Cassandra API。The following commands detail how to connect to Azure CosmosDB Cassandra API from cqlsh. 在 Spark 中运行示例时,可以使用此命令进行验证。This is useful for validation as you run through the samples in Spark.
从 Linux/Unix/Mac:From Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmos.azure.cn 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1.Azure HDInsight-Spark1. Azure HDInsight-Spark

以下文章介绍了 HDinsight-Spark 服务、预配、连接到 Azure Cosmos DB Cassandra API 时所需的群集配置,以及用于演示 DDL 操作、DML 操作等的几个示例 Notebook。The article below covers HDinsight-Spark service, provisioning, cluster configuration for connecting to Azure Cosmos DB Cassandra API, and several sample notebooks that cover DDL operations, DML operations and more.
在 HDInsight-Spark 中使用 Azure Cosmos DB Cassandra APIWork with Azure Cosmos DB Cassandra API from Azure HDInsight-Spark

2.常规 Spark 环境2. Spark environment in general

前面的部分与基于 Azure Spark 的 PaaS 服务相关,本部分介绍任何常规 Spark 环境。While the sections above were specific to Azure Spark-based PaaS services, this section covers any general Spark environment. 下面详细介绍了连接器依赖项、导入和 Spark 会话配置。Connector dependencies, imports, and Spark session configuration are detailed below. “后续步骤”部分提供了 DDL 操作、DML 操作等的代码示例。The "Next steps" section covers code samples for DDL operations, DML operations and more.

连接器依赖项:Connector dependencies:

  1. 添加 Maven 坐标以获取 Spark 的 Cassandra 连接器Add the maven coordinates to get the Cassandra connector for Spark
  2. 添加 Maven 坐标以获取 Cassandra API 的 Azure Cosmos DB 帮助器库Add the maven coordinates for the Azure Cosmos DB helper library for Cassandra API

导入:Imports:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Spark 会话配置:Spark session configuration:

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmos.azure.cn")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

//Throughput-related. You can adjust the values as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

后续步骤Next steps

以下文章演示了 Spark 与 Azure Cosmos DB Cassandra API 的集成。The following articles demonstrate Spark integration with Azure Cosmos DB Cassandra API.