从批量执行工具库迁移到 Azure Cosmos DB Java V4 SDK 中的批量操作支持

适用范围: NoSQL

本文介绍需要执行哪些步骤,才能将使用 Java 批量执行工具库的现有应用程序的代码迁移到使用最新版 Java SDK 中的批量操作支持功能。

启用批量操作支持

若要在 Java SDK 中使用批量支持,请包括以下导入:

import com.azure.cosmos.models.*;

将文档添加到反应式流

Java V4 SDK 中的批量支持的工作原理是将文档添加到反应式流对象。 例如,可以单独添加每个文档:

Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();

//  Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);

也可以使用 fromIterable 将文档添加到列表中的流:

class SampleDoc {
    public SampleDoc() {
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    private String id="";
}
List<SampleDoc> docList = new ArrayList<>();
for (int i = 1; i <= 5; i++){ 
    SampleDoc doc = new SampleDoc();           
    String id = "id-"+i;
    doc.setId(id);
    docList.add(doc);
}
           
Flux<SampleDoc> docs = Flux.fromIterable(docList);

如果要进行批量创建或更新插入项(类似于使用 DocumentBulkExecutor.importAll),则需要将反应流传递给如下所示的方法:

private void bulkUpsertItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

还可以使用如下所示的方法,但它仅用于创建项:

private void bulkCreateItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

旧 BulkExecutor 库中的 DocumentBulkExecutor.importAll 方法还用于批量修补项。 旧 DocumentBulkExecutor.mergeAll 方法也用于修补,但仅用于 set 修补操作类型。 若要在 V4 SDK 中执行批量修补操作,首先需要创建修补操作:

CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
        .set("/registered", 0);

然后,可以将操作以及文档反应流传递给如下所示的方法。 在此示例中,我们应用 addset 修补操作类型。 可在此处找到支持的完整修补程序操作类型集,这些内容在有关 Azure Cosmos DB 中的部分文档更新的概述中。

private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations
            .getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

备注

在上面的示例中,我们应用 addset 来修补其根父级存在的元素。 但是,不能在根父级不存在的情况下执行此操作。 这是因为 Azure Cosmos DB 部分文档更新的灵感源自 JSON Patch RFC 6902。 如果修补根父级不存在的位置,请先读回完整文档,然后使用如下所示的方法来替换文档:

private void bulkReplaceItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations
            .getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

如果要进行批量删除(类似于使用 DocumentBulkExecutor.deleteAll),则需要使用批量删除:

private void bulkDeleteItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations
            .getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

重试、超时和吞吐量控制

Java V4 SDK 中的批量支持无法以原生方式处理重试和超时。 可以参考批量执行工具 - Java 库中的指南,其中包括一个示例,用于实现正确处理重试和超时所需的抽象。 此示例还包含本地和多区域吞吐量控制的示例。 也可参阅应用程序是否应在出错时进行重试部分,以获取有关可能发生的不同类型的错误的更多指导,以及处理重试的最佳做法。

后续步骤