Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
APPLIES TO:
NoSQL
This article describes the required steps to migrate an existing application's code that uses the Java bulk executor library to the bulk support feature in the latest version of the Java SDK.
To use bulk support in the Java SDK, include the import below:
import com.azure.cosmos.models.*;
Bulk support in the Java V4 SDK works by adding documents to a reactive stream object. For example, you can add each document individually:
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
Or you can add the documents to the stream from a list, using fromIterable
:
class SampleDoc {
public SampleDoc() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
private String id="";
}
List<SampleDoc> docList = new ArrayList<>();
for (int i = 1; i <= 5; i++){
SampleDoc doc = new SampleDoc();
String id = "id-"+i;
doc.setId(id);
docList.add(doc);
}
Flux<SampleDoc> docs = Flux.fromIterable(docList);
If you want to do bulk create or upsert items (similar to using DocumentBulkExecutor.importAll), you need to pass the reactive stream to a method like the following:
private void bulkUpsertItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
You can also use a method like the below, but this is only used for creating items:
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
The DocumentBulkExecutor.importAll method in the old BulkExecutor library was also used to bulk patch items. The old DocumentBulkExecutor.mergeAll method was also used for patch, but only for the set
patch operation type. To do bulk patch operations in the V4 SDK, first you need to create patch operations:
CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
.set("/registered", 0);
Then you can pass the operations, along with the reactive stream of documents, to a method like the below. In this example, we apply both add
and set
patch operation types. The full set of patch operation types supported can be found here in our overview of partial document update in Azure Cosmos DB.
private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
Note
In the above example, we apply add
and set
to patch elements whose root parent exists. However, you cannot do this where the root parent does not exist. This is because Azure Cosmos DB partial document update is inspired by JSON Patch RFC 6902. If patching where root parent does not exist, first read back the full documents, then use a method like the below to replace the documents:
private void bulkReplaceItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
And if you want to do bulk delete (similar to using DocumentBulkExecutor.deleteAll), you need to use bulk delete:
private void bulkDeleteItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
The bulk support in Java V4 SDK doesn't handle retries and timeouts natively. You can refer to the guidance in Bulk Executor - Java Library, which includes a sample that implements an abstraction for handling retries and timeouts properly. The sample also has examples for local and multiple-regional throughput control. You can also refer to the section should my application retry on errors for more guidance on the different kinds of errors that can occur, and best practices for handling retries.
- Bulk samples on GitHub
- Trying to do capacity planning for a migration to Azure Cosmos DB?
- If all you know is the number of vcores and servers in your existing database cluster, read about estimating request units using vCores or vCPUs
- If you know typical request rates for your current database workload, read about estimating request units using Azure Cosmos DB capacity planner