针对 Azure Cosmos DB 数据执行批量操作

本教程演示如何在 Azure Cosmos DB Java V4 SDK 中执行批量操作。 此版本的 SDK 包括批量执行程序库。 如果使用的是旧版 Java SDK,请迁移到 最新版本。 Azure Cosmos DB Java V4 SDK 是当前建议用于Java批量支持的解决方案。

目前,批量执行工具库仅受 Azure Cosmos DB for NoSQL 和 API for Gremlin 帐户支持。 若要了解如何将批量执行程序.NET库与适用于 Gremlin 的 API 配合使用,请参阅<在 Gremlin 的Azure Cosmos DB中执行批量操作

先决条件

克隆示例应用程序

从 GitHub 下载 Java V4 SDK 的示例存储库。 这些示例应用程序对 Azure Cosmos DB 执行 CRUD 操作和其他常见操作。 若要克隆存储库,请打开命令提示符,转到要复制应用程序的目录,然后运行以下命令:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

克隆的存储库在 SampleBulkQuickStartAsync.java 文件夹中包含示例 azure-cosmos-java-sql-api-samples/src/main/java/com/azure/cosmos/examples/bulk/async。 应用程序生成文档,并执行操作以批量创建、更新或插入、替换和删除 Azure Cosmos DB 中的项。 以下部分查看示例应用中的代码。

Azure Cosmos DB 中的批量执行

  1. Azure Cosmos DB 的连接字符串将作为参数读取,并分配到在 /examples/common/AccountSettings.java 文件中定义的变量。 必须设置这些环境变量
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

若要运行批量示例,请指定其 Main 类:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. 使用以下语句初始化 CosmosAsyncClient 对象:
client = new CosmosClientBuilder()
    .endpoint(AccountSettings.HOST)
        .credential(new DefaultAzureCredentialBuilder().build())
    .preferredRegions(preferredRegions)
    .contentResponseOnWriteEnabled(true)
    .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
  1. 该示例创建了一个异步数据库和容器。 然后,它会创建多个要对其执行批量操作的文档。 它将这些文档添加到 Flux<Family> 反应式流对象中:
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();

//  Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
  1. 该示例包含用于批量创建、更新插入、替换和删除的方法。 在每个方法中,我们将 BulkWriter Flux<Family> 流中的系列文档映射到 CosmosBulkOperations 中的多个方法调用。 这些操作将添加到另一个反应式流对象 Flux<CosmosItemOperation>。 然后,流会传递给我们在开始时创建的异步 executeBulkOperationscontainer 方法,并批量执行操作。 以下面的批量创建方法为例:
private void bulkCreateItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}
  1. BulkWriter.java 示例应用程序相同的目录中的类演示如何处理批量执行期间发生的速率限制(429)和超时(408)错误,以及如何重试这些操作。 以下方法还演示如何实现本地和多区域吞吐量控制。
private void bulkUpsertItemsWithBulkWriterAbstraction() {
    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
    CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
    BulkWriter bulkWriter = new BulkWriter(container);
    bulkWriter.scheduleWrites(andersonItemOperation);
    bulkWriter.scheduleWrites(wakeFieldItemOperation);
    bulkWriter.execute().subscribe();
}

private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
    ThroughputControlGroupConfig groupConfig =
            new ThroughputControlGroupConfigBuilder()
                    .setGroupName("group1")
                    .setTargetThroughput(200)
                    .build();
    container.enableLocalThroughputControlGroup(groupConfig);
    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
    CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
    BulkWriter bulkWriter = new BulkWriter(container);
    bulkWriter.scheduleWrites(andersonItemOperation);
    bulkWriter.scheduleWrites(wakeFieldItemOperation);
    bulkWriter.execute().subscribe();
}

private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
    String controlContainerId = "throughputControlContainer";
    CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
    database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();

    ThroughputControlGroupConfig groupConfig =
            new ThroughputControlGroupConfigBuilder()
                    .setGroupName("group-" + UUID.randomUUID())
                    .setTargetThroughput(200)
                    .build();

    GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
            .setControlItemRenewInterval(Duration.ofSeconds(5))
            .setControlItemExpireInterval(Duration.ofSeconds(20))
            .build();

    container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
    CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
    requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
    CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
    BulkWriter bulkWriter = new BulkWriter(container);
    bulkWriter.scheduleWrites(andersonItemOperation);
    bulkWriter.scheduleWrites(wakeFieldItemOperation);
    bulkWriter.execute().subscribe();
}
  1. 此示例还包括批量创建方法,说明如何添加响应处理和设置执行选项:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {

        CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
        CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();

        if (cosmosBulkOperationResponse.getException() != null) {
            logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
        } else if (cosmosBulkItemResponse == null ||
            !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {

            logger.error(
                "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                    "successfully with " + "a" + " {} response code.",
                cosmosItemOperation.<Family>getItem().getId(),
                cosmosItemOperation.<Family>getItem().getLastName(),
                cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
        } else {
            logger.info(
                "Item ID: [{}]  Item PartitionKey Value: [{}]",
                cosmosItemOperation.<Family>getItem().getId(),
                cosmosItemOperation.<Family>getItem().getLastName());
            logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
            logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
        }
        if (cosmosBulkItemResponse == null) {
            return Mono.error(new IllegalStateException("No response retrieved."));
        } else {
            return Mono.just(cosmosBulkItemResponse);
        }
    }).blockLast();
}

private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
    CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();

    // The default value for maxMicroBatchConcurrency is 1.
    // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
    //
    // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
    // When the RU has already been under saturation, increasing the concurrency will not help the situation,
    // rather it may cause more 429 and request timeout.
    bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
}

大规模引入策略

对于大规模数据摄取,吞吐量控制和重试策略是避免被限流的主要手段,而不是仅靠批大小或并发度。 专注于批大小和并发,而不解决吞吐量限制,重试不会可靠地防止大规模 429(速率受限)响应。

选择引入方法

Scenario 建议的方法
跨多台计算机的大规模分布式数据摄取 Apache Spark 连接器
需要精细控制的单机引入 带有吞吐量控制的 Java SDK 批量执行器

对于大规模引入, Apache Spark 连接器 是首选选择。 它处理分布式计算、自动重试和回退,以及跨工作器节点的负载均衡,而无需手动优化并发或批处理大小。

带吞吐量控制的 Java SDK 批量执行器

如果您的场景需要直接使用 Java SDK,azure-cosmos-distributed-bulk-sample提供了生产规模数据引入的参考实现。 它演示了以下关键设置:

  • 自动调优的微批处理大小:该示例会根据情况动态调整每个物理分区的批处理大小(从 1 到 100 个文档),以充分利用吞吐量,同时将限流控制在可管理范围内。
  • 可配置的重试计数:默认为每批 20 次重试。 根据对暂时性故障和下游延迟要求的容忍度进行调整。
  • 每台计算机的并发批处理数:默认值为 8 个并发批处理。 建议范围为摄取机器上可用 CPU 内核数的 25%–100%。

Tip

从默认值开始,并监视 429(速率受限)响应速率。 如果出现过多限流,请减少并发批次或实施吞吐量控制。

共享容器的吞吐量控制

如果多个工作负荷共享同一容器,请使用 吞吐量控制组 来限制批量引入消耗的 RU/秒,并阻止其耗尽其他工作负荷:

注释

吞吐量控制需要受支持的最低Azure Cosmos DB Java SDK v4 版本。 吞吐量控制 API 也带有注释 @Beta ,并可能会更改。 使用以下示例之前,请验证 吞吐量控制文档中 的当前版本要求和 API 状态。

ThroughputControlGroupConfig groupConfig =
    new ThroughputControlGroupConfigBuilder()
        .groupName("bulkIngestionGroup")
        .targetThroughputThreshold(0.75) // limit ingestion to 75% of provisioned throughput
        .defaultControlGroup(true)
        .build();

container.enableLocalThroughputControlGroup(groupConfig);

若要跨多个引入计算机协调吞吐量限制,请改用 多区域吞吐量控制

参考实现

Sample Description
azure-cosmos-distributed-bulk-sample 使用作业跟踪、可重启批处理、自动优化的微批大小以及可配置的重试和并发设置进行端到端分布式引入。
ThroughputControlQuickstartAsync.java 本地吞吐量控制、通过元数据容器共享 RU 限制的多区域吞吐量控制,以及基于优先级的限流。

性能提示

使用批量执行程序库时,请考虑以下几点以提高性能:

  • 为获得最佳性能,请在 Azure VM 中运行应用程序,该 Azure VM 所处区域与 Azure Cosmos DB 帐户写入区域相同。

  • 实现更高的吞吐量:

    • 设置足够大的 JVM 堆大小,以避免处理大量文档时出现内存问题。 建议的堆大小: max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch))
    • 批量操作具有预处理阶段,因此处理大型文档集时可获得更高的吞吐量。 例如,通过运行 10 次批量导入、每次导入 1,000,000 个文档来导入 10,000,000 个文档,比运行 100 次批量导入、每次导入 100,000 个文档更高效。
  • 在单个虚拟机中,为整个应用程序实例化一个 CosmosAsyncClient 对象,该对象对应于某个特定的 Azure Cosmos DB 容器。

  • 单次批量操作 API 执行会因在内部生成多个任务而大量占用客户端机器的 CPU 和网络 I/O 资源。 避免在应用程序进程中生成多个并发任务,其中每个任务执行批量操作 API 调用。 如果单个虚拟机上运行的单个批量操作 API 调用无法使用整个容器的吞吐量(如果容器的吞吐量 > 为 100 万 RU/秒),请创建单独的虚拟机以并发执行批量操作 API 调用。