本文介绍将现有应用程序代码迁移到最新版本 Java SDK 中的批量支持功能所需的步骤。
启用批量支持
若要在 Java SDK 中使用批量支持,请在此处进行导入:
import com.azure.cosmos.models.*;
将文档添加到反应式流
Java V4 SDK 中的批量支持的工作原理是将文档添加到反应式流对象。 例如,可以单独添加每个文档:
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
或者,您可以使用 fromIterable 将文档从列表中添加到流中:
class SampleDoc {
public SampleDoc() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
private String id="";
}
List<SampleDoc> docList = new ArrayList<>();
for (int i = 1; i <= 5; i++){
SampleDoc doc = new SampleDoc();
String id = "id-"+i;
doc.setId(id);
docList.add(doc);
}
Flux<SampleDoc> docs = Flux.fromIterable(docList);
如果要批量创建或更新插入项(类似于使用 DocumentBulkExecutor.importAll),需要将反应式流传递给如下示例的方法:
private void bulkUpsertItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
也可以使用如下所示的方法,但此方法仅用于创建项:
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
旧 BulkExecutor 库中的 DocumentBulkExecutor.importAll 方法也用于批量 更新 项。 旧的 DocumentBulkExecutor.mergeAll 方法也用于修补,但仅用于 set 修补作类型。 若要在 V4 SDK 中执行批量修补作,首先需要创建修补作:
CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
.set("/registered", 0);
然后,可以将操作及文档的响应式流传递给如下所示的方法。 在此示例中,我们将应用add和set修补操作类型。 在有关Azure Cosmos DB 中部分文档更新的概述中,可以找到支持的完整修补操作类型集。
private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
注释
在上一示例中,我们将应用 add 和 set 修补其根父级存在的元素。 但是,由于根父级 不存在,不能在此处执行该操作。 这是因为 Azure Cosmos DB 部分文档更新 受 JSON 修补程序 RFC 6902 的启发。 如果在根父级不存在的情况下进行修补,首先读回完整文档,然后使用这里提到的方法来替换这些文档:
private void bulkReplaceItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
如果要进行批量 删除 (类似于使用 DocumentBulkExecutor.deleteAll),则需要使用批量删除:
private void bulkDeleteItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
重试、超时和吞吐量控制
Java V4 SDK 中的批量支持并不会原生处理重试和超时。 可以参考批量执行程序 - Java 库中的指南,该指南包含一个实现重试和超时正确处理的抽象的示例。 此示例还包含本地和多区域吞吐量控制的示例。 您也可以参阅应用程序是否应该在错误时重试一节,以获取有关可能发生的不同种类错误的更多指导,以及处理重试的最佳实践。
后续步骤
- GitHub 上的批量示例
- 尝试为迁移到 Azure Cosmos DB 进行容量规划?
- 如果你只知道现有数据库群集中的 vCore 和服务器数量,请阅读根据 vCore 或 vCPU 数量估算请求单位数
- 若知道当前数据库工作负载的典型请求速率,请阅读使用 Azure Cosmos DB 容量计划工具估算请求单位