Important
Java SDK 中的吞吐量控制 API 使用 @Beta 进行批注,并可能会更改。 在升级之前查看 SDK 更改日志。
吞吐量控制组允许限制Azure Cosmos DB操作的请求单位(RU)消耗量。 如果有多个应用程序或工作负荷共享同一容器,并且希望确保一个工作负荷不会消耗所有可用的吞吐量,这非常有用。 从版本 4.13.0 开始,Azure Cosmos DB Java SDK v4 中提供了吞吐量控制。
Java SDK 支持三种吞吐量控制模式:
- 本地吞吐量控制 - 限制单个客户端实例中的 RU 消耗。
- 全局吞吐量控制 - 通过元数据容器协调,限制跨多个客户端实例的 RU 消耗。
- 服务器端吞吐量控制 - 使用服务器端吞吐量存储桶限制 RU 消耗。
先决条件
- Azure Cosmos DB Java SDK v4 >= 4.13.0(用于本地和多区域吞吐量控制)
- Azure Cosmos DB Java SDK v4 >= 4.54.0(适用于变更源处理器的吞吐量控制)
- Azure Cosmos DB Java SDK v4 >= 4.74.0(用于通过吞吐量桶实现服务器端吞吐量控制)
必需的导入
本文中的代码示例使用以下导入:
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.GlobalThroughputControlConfig;
import com.azure.cosmos.ThroughputControlGroupConfig;
import com.azure.cosmos.ThroughputControlGroupConfigBuilder;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.ChangeFeedProcessor;
import com.azure.cosmos.models.ChangeFeedProcessorBuilder;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
创建吞吐量控制组
ThroughputControlGroupConfigBuilder用于创建吞吐量控制组配置。 可以设置:
| 财产 | Description |
|---|---|
groupName |
控件组的唯一名称。 |
targetThroughput |
组的绝对 RU/秒限制(必须为 > 0)。 |
targetThroughputThreshold |
要分配给此组的总预配吞吐量的百分比(0,1]。 |
defaultControlGroup |
这是否是未分配给特定组的所有请求的默认组。 |
priorityLevel |
基于优先级的执行的优先级级别(PriorityLevel.HIGH 或 PriorityLevel.LOW)。 |
throughputBucket |
要分配给此组的服务器端吞吐量存储桶 ID (>= 0)。 与服务器端吞吐量控制一起使用。 |
continueOnInitError |
如果 true,则在控制组初始化失败时,操作将回退为使用容器的全部吞吐量。 |
注释
在构建吞吐量控制组时,必须至少设置 targetThroughput、targetThroughputThreshold、priorityLevel 或 throughputBucket 中的一个。 对于本地和多区域吞吐量控制,请设置 targetThroughput 或 targetThroughputThreshold。 对于服务器端吞吐量控制、设置 priorityLevel 或 throughputBucket /或两者。
本地吞吐量控制
本地吞吐量控制限制单个 CosmosAsyncClient 实例中的 RU 消耗量。 如果要隔离同一应用程序中不同类型的操作,这非常有用。
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("localControlGroup")
.targetThroughputThreshold(0.1) // limit to 10% of provisioned throughput
.build();
container.enableLocalThroughputControlGroup(groupConfig);
启用本地吞吐量控制后,该容器上的所有操作仅限于此客户端实例中配置的 RU/s。
多个本地控件组
可以在同一容器上创建多个本地控件组。 例如,若要分隔读取和写入工作负荷,
// Limit reads to 25% of provisioned throughput
ThroughputControlGroupConfig readGroup =
new ThroughputControlGroupConfigBuilder()
.groupName("reads")
.targetThroughputThreshold(0.25)
.defaultControlGroup(true) // default group for all requests
.build();
container.enableLocalThroughputControlGroup(readGroup);
// Limit writes to a fixed 500 RU/s
ThroughputControlGroupConfig writeGroup =
new ThroughputControlGroupConfigBuilder()
.groupName("writes")
.targetThroughput(500)
.build();
container.enableLocalThroughputControlGroup(writeGroup);
若要将特定操作分配给非默认组,请使用 CosmosItemRequestOptions:
CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
requestOptions.setThroughputControlGroupName("writes");
container.createItem(newItem, new PartitionKey(newItem.getPartitionKey()), requestOptions)
.block();
全局吞吐量控制
全局吞吐量控制通过使用Azure Cosmos DB中的共享元数据容器协调多个客户端实例的 RU 消耗。 当有多个微服务或应用程序实例访问同一容器并且想要强制实施集体吞吐量限制时,这非常有用。
设置控制容器
多区域吞吐量控制功能需要元数据容器来跟踪客户端的 RU 使用情况。 可以为此创建专用数据库和容器,或使用现有容器。
// Create a dedicated database and container for throughput control metadata
client.createDatabaseIfNotExists("ThroughputControlDatabase").block();
CosmosAsyncDatabase throughputControlDatabase =
client.getDatabase("ThroughputControlDatabase");
throughputControlDatabase
.createContainerIfNotExists("ThroughputControlContainer", "/groupId")
.block();
启用多区域吞吐量控制
// Create the throughput control group config
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("globalControlGroup")
.targetThroughputThreshold(0.25) // limit to 25% of provisioned throughput
.defaultControlGroup(true)
.build();
// Create the global control config pointing to the metadata container
GlobalThroughputControlConfig globalControlConfig =
client.createGlobalThroughputControlConfigBuilder(
"ThroughputControlDatabase",
"ThroughputControlContainer")
.setControlItemRenewInterval(Duration.ofSeconds(5))
.setControlItemExpireInterval(Duration.ofSeconds(11))
.build();
// Enable multiple-regional throughput control on the target container
container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
多区域吞吐量控制的配置选项
| 财产 | Description | 默认 |
|---|---|---|
controlItemRenewInterval |
每个客户端在元数据容器中更新其使用情况的频率(至少为 5 秒)。 | 5 秒 |
controlItemExpireInterval |
检测离线客户端以及重新分配其份额的速度。 必须为 > 2 × renewInterval + 1。 | 11 秒 |
Important
controlItemExpireInterval 必须大于 2 × controlItemRenewInterval + 1 秒。 使用较短的间隔会使系统更具响应能力,但会增加元数据开销。
服务器端吞吐量控制
服务器端吞吐量控制使用 吞吐量存储桶 在服务器级别强制实施 RU 限制。 此模式需要Azure Cosmos DB Java SDK v4 版本 4.74.0 或更高版本。
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("serverSideControlGroup")
.throughputBucket(2) // assign to server-side bucket 2
.build();
container.enableServerThroughputControlGroup(groupConfig);
注释
服务器端吞吐量控制要求在吞吐量控制组配置中设置 priorityLevel 或 throughputBucket(或两者都设置)。
有关吞吐量存储桶的详细信息,请参阅 Azure Cosmos DB 中的吞吐量存储桶。
错误处理
当请求超出为其控制组分配的 RU 预算时,SDK 会在本地拒绝该请求并返回 CosmosException 状态 429 (Too Many Requests),而不向服务发送请求。 应用程序必须像往常一样继续处理 429 个响应(SDK 的内置重试策略适用)。
若要正常处理初始化错误,请设置为continueOnInitErrortrue:
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("resilientGroup")
.targetThroughputThreshold(0.5)
.continueOnInitError(true) // fall back to full throughput on init failure
.build();
使用更改源处理器进行吞吐量控制
可以将吞吐量控制应用于 更改源处理器 ,以限制更改源处理期间的 RU 消耗。 这在回填场景中尤为有用,尤其是在更改源处理器读取大量数据时。 此集成需要Azure Cosmos DB Java SDK v4 版本 4.54.0 或更高版本。
若要在更改源处理器上启用吞吐量控制,请在生成处理器之前在源容器上配置吞吐量控制组:
// Configure throughput control on the feed container
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("cfpControlGroup")
.targetThroughputThreshold(0.3) // limit to 30% of provisioned throughput
.defaultControlGroup(true)
.build();
feedContainer.enableLocalThroughputControlGroup(groupConfig);
// Build the Change Feed Processor — it inherits the throughput control settings
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName("host-1")
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleChanges(docs -> {
for (JsonNode doc : docs) {
// process each change
}
})
.buildChangeFeedProcessor();
还可以将多区域吞吐量控制与更改源处理器配合使用,以跨多个处理器实例协调 RU 限制。