适用范围: NoSQL
本文介绍需要执行哪些步骤,才能将使用 Java 批量执行工具库的现有应用程序的代码迁移到使用最新版 Java SDK 中的批量操作支持功能。
若要在 Java SDK 中使用批量支持,请包括以下导入:
import com.azure.cosmos.models.*;
Java V4 SDK 中的批量支持的工作原理是将文档添加到反应式流对象。 例如,可以单独添加每个文档:
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
也可以使用 fromIterable
将文档添加到列表中的流:
class SampleDoc {
public SampleDoc() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
private String id="";
}
List<SampleDoc> docList = new ArrayList<>();
for (int i = 1; i <= 5; i++){
SampleDoc doc = new SampleDoc();
String id = "id-"+i;
doc.setId(id);
docList.add(doc);
}
Flux<SampleDoc> docs = Flux.fromIterable(docList);
如果要进行批量创建或更新插入项(类似于使用 DocumentBulkExecutor.importAll),则需要将反应流传递给如下所示的方法:
private void bulkUpsertItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
还可以使用如下所示的方法,但它仅用于创建项:
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
旧 BulkExecutor 库中的 DocumentBulkExecutor.importAll 方法还用于批量修补项。 旧 DocumentBulkExecutor.mergeAll 方法也用于修补,但仅用于 set
修补操作类型。 若要在 V4 SDK 中执行批量修补操作,首先需要创建修补操作:
CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
.set("/registered", 0);
然后,可以将操作以及文档反应流传递给如下所示的方法。 在此示例中,我们应用 add
和 set
修补操作类型。 可在此处找到支持的完整修补程序操作类型集,这些内容在有关 Azure Cosmos DB 中的部分文档更新的概述中。
private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
备注
在上面的示例中,我们应用 add
和 set
来修补其根父级存在的元素。 但是,不能在根父级不存在的情况下执行此操作。 这是因为 Azure Cosmos DB 部分文档更新的灵感源自 JSON Patch RFC 6902。 如果修补根父级不存在的位置,请先读回完整文档,然后使用如下所示的方法来替换文档:
private void bulkReplaceItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
如果要进行批量删除(类似于使用 DocumentBulkExecutor.deleteAll),则需要使用批量删除:
private void bulkDeleteItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
Java V4 SDK 中的批量支持无法以原生方式处理重试和超时。 可以参考批量执行工具 - Java 库中的指南,其中包括一个示例,用于实现正确处理重试和超时所需的抽象。 此示例还包含本地和多区域吞吐量控制的示例。 也可参阅应用程序是否应在出错时进行重试部分,以获取有关可能发生的不同类型的错误的更多指导,以及处理重试的最佳做法。
- GitHub 上的批处理示例
- 正在尝试为迁移到 Azure Cosmos DB 进行容量计划?
- 如果只知道现有数据库群集中的 vCore 和服务器数量,请阅读使用 vCore 或 vCPU 估算请求单位
- 若知道当前数据库工作负载的典型请求速率,请阅读使用 Azure Cosmos DB 容量计划工具估算请求单位