从 Spark 连接到 Azure Cosmos DB Cassandra API

适用于: Cassandra API

本文是有关 Spark 中 Azure Cosmos DB Cassandra API 集成的系列文章中的一篇。 这些文章介绍了连接、数据定义语言 (DDL) 操作、基本数据操作语言 (DML) 操作,以及 Spark 中的高级 Azure Cosmos DB Cassandra API 集成。

先决条件

连接的依赖项

  • Cassandra 的 Spark 连接器: Spark 连接器用于连接到 Azure Cosmos DB Cassandra API。 请识别并使用 Maven 中心内与 Spark 环境的 Spark 和 Scala 版本兼容的连接器版本。 建议使用支持 Spark 3.0 或更高版本的环境,以及在 Maven 坐标 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 处提供的 Spark 连接器。 如果使用 Spark 2.x,我们建议使用带 Spark 版本 2.4.5 的环境,并使用在 Maven 坐标 com.datastax.spark:spark-cassandra-connector_2.11:2.4.3 处提供的 Spark 连接器。

  • 适用于 Cassandra API 的 Azure Cosmos DB 帮助程序库:如果使用的是 Spark 2.x 版本,则除了 Spark 连接器以外,还需要名为 azure-cosmos-cassandra-spark-helper、位于 Azure Cosmos DB 中 Maven 坐标 com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 处的另一个库,这样才能应对速率限制。 此库包含自定义连接工厂和重试策略类。

    Azure Cosmos DB 中的重试策略配置为处理 HTTP 状态代码 429(“请求速率太大”)异常。 Azure Cosmos DB Cassandra API 在 Cassandra 本机协议中将这些异常解释为过载错误,你可以结合退让进行重试。 由于 Azure Cosmos DB 使用预配的吞吐量模型,当传入/传出速率增大时,会发生请求速率限制异常。 重试策略可以防范 Spark 作业出现数据高峰(短暂性地超过为容器分配的吞吐量)。 如果使用 Spark 3.x 连接器,则不需要实现此库。

    备注

    重试策略只能防范 Spark 作业出现短暂的高峰。 如果尚未配置用于运行工作负荷的足够 RU,则重试策略不适用,并且重试策略类会再次引发异常。

  • Azure Cosmos DB 帐户连接详细信息: Azure Cassandra API 帐户名称、帐户终结点和密钥。

优化 Spark 连接器吞吐量配置

下一部分列出了用于通过适用于 Cassandra 的 Spark 连接器控制吞吐量的所有相关参数。 若要优化参数以最大程度地提高 Spark 作业的吞吐量,需要正确配置 spark.cassandra.output.concurrent.writesspark.cassandra.concurrent.readsspark.cassandra.input.reads_per_sec 配置,以免发生过多的限制和退避(进而导致吞吐量下降)。

这些配置的最佳值取决于 4 个因素:

  • 为要将数据引入到的表配置吞吐量(请求单位)。
  • Spark 群集中的工作器数。
  • 为 Spark 作业配置的执行器数(可以根据 Spark 版本使用 spark.cassandra.connection.connections_per_executor_maxspark.cassandra.connection.remoteConnectionsPerExecutor 进行控制)
  • 将每个请求发往 Cosmos DB 所存在的平均延迟(如果你在同一个数据中心内操作)。 对于写入,假设此值为 10 毫秒;对于读取,假设为 3 毫秒。

例如,如果我们有 5 个工作器并指定了值 spark.cassandra.output.concurrent.writes = 1 和值 spark.cassandra.connection.remoteConnectionsPerExecutor = 1,则有 5 个工作器以并发方式写入到表中,每个工作器占用 1 个线程。 如果执行一次写入需要 10 毫秒,则我们每秒可以通过每个线程发送 100 个请求(1000 毫秒除以 10)。 使用 5 个工作器时,每秒可以写入 500 次。 根据每次写入消耗 5 个请求单位这一平均成本,需要为目标表预配至少 2500 个请求单位(5 RU x 每秒 500 次写入)。

增加执行器数可以增加给定作业中的线程数,进而增加吞吐量。 但是,这种做法产生的确切影响可能根据具体的作业而异,而通过工作器数控制吞吐量则更具确定性。 还可以通过分析给的请求获取请求单位 (RU) 费用,来确定该请求的确切成本。 在为表或密钥空间预配吞吐量时,这样可以帮助你提供更准确的配置。 请查看此文,了解如何在每个请求的级别获取请求单位费用。

备注

上述指导假设数据合理且统一分布。 如果数据中存在明显的偏差(即,对同一分区键值的读取/写入次数异常大),则即使在表中预配大量的请求单元,也仍可能会遇到瓶颈。 请求单位在物理分区中平均分配,重度的数据倾斜可能会导致对单个分区的请求出现瓶颈。

Spark 连接器吞吐量配置参数

下表列出了连接器提供的特定于 Azure Cosmos DB Cassandra API 的吞吐量配置参数。 有关所有配置参数的详细列表,请参阅 Spark Cassandra 连接器 GitHub 存储库的配置参考页。

属性名称 默认值 说明
spark.cassandra.output.batch.size.rows 1 每个批的行数。 请将此参数设置为 1。 此参数用于提高重型工作负荷的吞吐量。
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) 每个执行器的每个节点的最大连接数。 10*n 相当于 n 节点 Cassandra 群集中每个节点可以建立 10 个连接。 因此,如果需要为 5 节点 Cassandra 群集的每个执行器的每个节点建立 5 个连接,则应将此配置设置为 25。 请根据为 Spark 作业配置的并行度或执行器数目修改此值。
spark.cassandra.output.concurrent.writes 100 定义每个执行器可以执行的并行写入数。 由于“batch.size.rows”设置为 1,因此请务必相应地增大此值。 请根据工作负荷要实现的并行度或吞吐量修改此值。
spark.cassandra.concurrent.reads 512 定义每个执行器可以执行的并行读取数。 请根据工作负荷要实现的并行度或吞吐量修改此值
spark.cassandra.output.throughput_mb_per_sec 定义每个执行器的总写入吞吐量。 可将此参数用作 Spark 作业吞吐量的上限,并根据 Cosmos 容器的预配吞吐量修改此参数。
spark.cassandra.input.reads_per_sec 定义每个执行器的总读取吞吐量。 可将此参数用作 Spark 作业吞吐量的上限,并根据 Cosmos 容器的预配吞吐量修改此参数。
spark.cassandra.output.batch.grouping.buffer.size 1000 定义每个 Spark 任务的、在发送到 Cassandra API 之前可以存储在内存中的批数
spark.cassandra.connection.keep_alive_ms 60000 定义在经过多长的时间之后未使用的连接可供使用。

请根据 Spark 作业的预期工作负荷,以及为 Cosmos DB 帐户预配的吞吐量,来调整这些参数的吞吐量和并行度。

从 Spark 连接到 Azure Cosmos DB Cassandra API

cqlsh

以下命令详细说明如何从 cqlsh 连接到 Azure CosmosDB Cassandra API。 在 Spark 中运行示例时,可以使用此命令进行验证。
从 Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmos.azure.cn 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

以下文章介绍了 Azure Databricks 群集预配、连接到 Azure Cosmos DB Cassandra API 时所需的群集配置,以及用于演示 DDL 操作、DML 操作等的几个示例 Notebook。
通过 Azure Databricks 使用 Azure Cosmos DB Cassandra API

2. Azure HDInsight-Spark

以下文章介绍了 HDinsight-Spark 服务、预配、连接到 Azure Cosmos DB Cassandra API 时所需的群集配置,以及用于演示 DDL 操作、DML 操作等的几个示例 Notebook。
在 HDInsight-Spark 中使用 Azure Cosmos DB Cassandra API

3. 常规 Spark 环境

前面的部分与基于 Azure Spark 的 PaaS 服务相关,本部分介绍任何常规 Spark 环境。 下面详细介绍了连接器依赖项、导入和 Spark 会话配置。 “后续步骤”部分提供了 DDL 操作、DML 操作等的代码示例。

连接器依赖项:

  1. 添加 Maven 坐标以获取 Spark 的 Cassandra 连接器
  2. 添加 Maven 坐标以获取 Cassandra API 的 Azure Cosmos DB 帮助器库

导入:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Spark 会话配置:

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmos.azure.cn")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

//Throughput-related. You can adjust the values as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
//spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") // Spark 2.x
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10") // Spark 3.x
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

后续步骤

以下文章演示了 Spark 与 Azure Cosmos DB Cassandra API 的集成。