从批量执行程序库迁移到 Azure Cosmos DB Java V4 SDK 中的批量支持

本文介绍将现有应用程序代码迁移到最新版本 Java SDK 中的批量支持功能所需的步骤。

启用批量支持

若要在 Java SDK 中使用批量支持,请在此处进行导入:

import com.azure.cosmos.models.*;

将文档添加到反应式流

Java V4 SDK 中的批量支持的工作原理是将文档添加到反应式流对象。 例如,可以单独添加每个文档:

Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();

//  Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);

或者,您可以使用 fromIterable 将文档从列表中添加到流中:

class SampleDoc {
    public SampleDoc() {
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    private String id="";
}
List<SampleDoc> docList = new ArrayList<>();
for (int i = 1; i <= 5; i++){ 
    SampleDoc doc = new SampleDoc();           
    String id = "id-"+i;
    doc.setId(id);
    docList.add(doc);
}
           
Flux<SampleDoc> docs = Flux.fromIterable(docList);

如果要批量创建或更新插入项(类似于使用 DocumentBulkExecutor.importAll),需要将反应式流传递给如下示例的方法:

private void bulkUpsertItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

也可以使用如下所示的方法,但此方法仅用于创建项:

private void bulkCreateItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

BulkExecutor 库中的 DocumentBulkExecutor.importAll 方法也用于批量 更新 项。 旧的 DocumentBulkExecutor.mergeAll 方法也用于修补,但仅用于 set 修补作类型。 若要在 V4 SDK 中执行批量修补作,首先需要创建修补作:

CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
        .set("/registered", 0);

然后,可以将操作及文档的响应式流传递给如下所示的方法。 在此示例中,我们将应用addset修补操作类型。 在有关Azure Cosmos DB 中部分文档更新概述中,可以找到支持的完整修补操作类型集。

private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations
            .getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

注释

在上一示例中,我们将应用 addset 修补其根父级存在的元素。 但是,由于根父级 不存在,不能在此处执行该操作。 这是因为 Azure Cosmos DB 部分文档更新 受 JSON 修补程序 RFC 6902 的启发。 如果在根父级不存在的情况下进行修补,首先读回完整文档,然后使用这里提到的方法来替换这些文档:

private void bulkReplaceItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations
            .getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

如果要进行批量 删除 (类似于使用 DocumentBulkExecutor.deleteAll),则需要使用批量删除:

private void bulkDeleteItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations
            .getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

重试、超时和吞吐量控制

Java V4 SDK 中的批量支持并不会原生处理重试和超时。 可以参考批量执行程序 - Java 库中的指南,该指南包含一个实现重试和超时正确处理的抽象示例。 此示例还包含本地和多区域吞吐量控制的示例。 您也可以参阅应用程序是否应该在错误时重试一节,以获取有关可能发生的不同种类错误的更多指导,以及处理重试的最佳实践。

后续步骤