Perform bulk operations on Azure Cosmos DB data
APPLIES TO: NoSQL
This tutorial provides instructions on performing bulk operations in the Azure Cosmos DB Java V4 SDK. This version of the SDK comes with the bulk executor library built-in. If you're using an older version of Java SDK, it's recommended to migrate to the latest version. Azure Cosmos DB Java V4 SDK is the current recommended solution for Java bulk support.
Currently, the bulk executor library is supported only by Azure Cosmos DB for NoSQL and API for Gremlin accounts. To learn about using bulk executor .NET library with API for Gremlin, see perform bulk operations in Azure Cosmos DB for Gremlin.
Prerequisites
If you don't have an Azure subscription, create a Trial before you begin.
You can use the Azure Cosmos DB Emulator with the
https://localhost:8081
endpoint. The Primary Key is provided in Authenticating requests.Java Development Kit (JDK) 1.8+
On Ubuntu, run
apt-get install default-jdk
to install the JDK.Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
Download and install a Maven binary archive
- On Ubuntu, you can run
apt-get install maven
to install Maven.
- On Ubuntu, you can run
Create an Azure Cosmos DB for NoSQL account by using the steps described in the create database account section of the Java quickstart article.
Clone the sample application
Now let's switch to working with code by downloading a generic samples repository for Java V4 SDK for Azure Cosmos DB from GitHub. These sample applications perform CRUD operations and other common operations on Azure Cosmos DB. To clone the repository, open a command prompt, navigate to the directory where you want to copy the application and run the following command:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
The cloned repository contains a sample SampleBulkQuickStartAsync.java
in the /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
folder. The application generates documents and executes operations to bulk create, upsert, replace and delete items in Azure Cosmos DB. In the next sections, we will review the code in the sample app.
Bulk execution in Azure Cosmos DB
- The Azure Cosmos DB's connection strings are read as arguments and assigned to variables defined in /
examples/common/AccountSettings.java
file. These environment variables must be set
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
To run the bulk sample, specify its Main Class:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
- The
CosmosAsyncClient
object is initialized by using the following statements:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.bulk.async;
import com.azure.cosmos.*;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.examples.common.Families;
import com.azure.cosmos.examples.common.Family;
// <CosmosBulkOperationsImport>
import com.azure.cosmos.models.*;
// </CosmosBulkOperationsImport>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
public class SampleBulkQuickStartAsync {
private static final Logger logger = LoggerFactory.getLogger(SampleBulkQuickStartAsync.class);
private final String databaseName = "AzureSampleFamilyDB";
private final String containerName = "FamilyContainer";
private CosmosAsyncClient client;
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
public static void main(String[] args) {
SampleBulkQuickStartAsync p = new SampleBulkQuickStartAsync();
try {
logger.info("Starting ASYNC main");
p.getStartedDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown();
}
}
public void close() {
client.close();
}
private void getStartedDemo() {
logger.info("Using Azure Cosmos DB endpoint: " + AccountSettings.HOST);
ArrayList<String> preferredRegions = new ArrayList<>();
preferredRegions.add("West US");
// Setting the preferred location to Cosmos DB Account region
// West US is just an example. User should set preferred location to the Cosmos DB region closest to the
// application
// Create async client
// <CreateAsyncClient>
client = new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.preferredRegions(preferredRegions)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
// </CreateAsyncClient>
createDatabaseIfNotExists();
createContainerIfNotExists();
// <AddDocsToStream>
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
// </AddDocsToStream>
logger.info("Bulk creates.");
bulkCreateItems(families);
bulkCreateItems(families);
andersenFamilyItem.setRegistered(false);
wakefieldFamilyItem.setRegistered(false);
Flux<Family> familiesToUpsert = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk upserts.");
bulkUpsertItems(familiesToUpsert);
andersenFamilyItem.setRegistered(true);
wakefieldFamilyItem.setRegistered(true);
Flux<Family> familiesToReplace = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk replace.");
bulkReplaceItems(familiesToReplace);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk create response processing without errors.");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk create response processing with 409 error");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk creates with execution options.");
bulkCreateItemsWithExecutionOptions(families);
logger.info("Bulk patches.");
// <PatchOperations>
CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
.set("/registered", 0);
// </PatchOperations>
// Note: here we apply "add" and "set" to patch elements whose root parent
// exists, but we cannot do this where the root parent does not exist. When this
// is required, read the full document first, then use replace.
bulkPatchItems(familiesToReplace, patchOps);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk upserts with BulkWriter abstraction");
bulkUpsertItemsWithBulkWriterAbstraction();
logger.info("Bulk upserts with BulkWriter Abstraction and Local Throughput Control");
bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl();
logger.info("Bulk upserts with BulkWriter Abstraction and Global Throughput Control");
bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl();
}
private void createDatabaseIfNotExists() {
logger.info("Create database " + databaseName + " if not exists.");
// Create database if not exists
// <CreateDatabaseIfNotExists>
Mono<CosmosDatabaseResponse> databaseIfNotExists = client.createDatabaseIfNotExists(databaseName);
databaseIfNotExists.flatMap(databaseResponse -> {
database = client.getDatabase(databaseResponse.getProperties().getId());
logger.info("Checking database " + database.getId() + " completed!\n");
return Mono.empty();
}).block();
// </CreateDatabaseIfNotExists>
}
private void createContainerIfNotExists() {
logger.info("Create container " + containerName + " if not exists.");
// Create container if not exists
// <CreateContainerIfNotExists>
CosmosContainerProperties containerProperties = new CosmosContainerProperties(
containerName, "/lastName");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
Mono<CosmosContainerResponse> containerIfNotExists = database
.createContainerIfNotExists(containerProperties, throughputProperties);
// Create container with 400 RU/s
CosmosContainerResponse cosmosContainerResponse = containerIfNotExists.block();
assert(cosmosContainerResponse != null);
assert(cosmosContainerResponse.getProperties() != null);
container = database.getContainer(cosmosContainerResponse.getProperties().getId());
// </CreateContainerIfNotExists>
//Modify existing container
containerProperties = cosmosContainerResponse.getProperties();
Mono<CosmosContainerResponse> propertiesReplace =
container.replace(containerProperties, new CosmosContainerRequestOptions());
propertiesReplace.flatMap(containerResponse -> {
logger.info(
"setupContainer(): Container {}} in {} has been updated with it's new properties.",
container.getId(),
database.getId());
return Mono.empty();
}).onErrorResume((exception) -> {
logger.error(
"setupContainer(): Unable to update properties for container {} in database {}. e: {}",
container.getId(),
database.getId(),
exception.getLocalizedMessage(),
exception);
return Mono.empty();
}).block();
}
// <BulkCreateItems>
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkCreateItems>
// <BulkDeleteItems>
private void bulkDeleteItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkDeleteItems>
// <BulkUpsertItems>
private void bulkUpsertItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkUpsertItems>
// <BulkPatchItems>
private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkPatchItems>
// <BulkReplaceItems>
private void bulkReplaceItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkReplaceItems>
// <BulkCreateItemsWithResponseProcessingAndExecutionOptions>
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
} else if (cosmosBulkItemResponse == null ||
!cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
logger.error(
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " +
"successfully with " + "a" + " {} response code.",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName(),
cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
} else {
logger.info(
"Item ID: [{}] Item PartitionKey Value: [{}]",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName());
logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
}
if (cosmosBulkItemResponse == null) {
return Mono.error(new IllegalStateException("No response retrieved."));
} else {
return Mono.just(cosmosBulkItemResponse);
}
}).blockLast();
}
private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
// The default value for maxMicroBatchConcurrency is 1.
// By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
//
// Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
// When the RU has already been under saturation, increasing the concurrency will not help the situation,
// rather it may cause more 429 and request timeout.
bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
}
// </BulkCreateItemsWithResponseProcessingAndExecutionOptions>
// <BulkWriterAbstraction>
private void bulkUpsertItemsWithBulkWriterAbstraction() {
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group1")
.setTargetThroughput(200)
.build();
container.enableLocalThroughputControlGroup(groupConfig);
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
String controlContainerId = "throughputControlContainer";
CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group-" + UUID.randomUUID())
.setTargetThroughput(200)
.build();
GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
.setControlItemRenewInterval(Duration.ofSeconds(5))
.setControlItemExpireInterval(Duration.ofSeconds(20))
.build();
container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
// </BulkWriterAbstraction>
private void shutdown() {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
//Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null) container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null) database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack " + "trace below.");
err.printStackTrace();
}
client.close();
logger.info("Done.");
}
}
- The sample creates an async database and container. It then creates multiple documents on which bulk operations will be executed. It adds these documents to a
Flux<Family>
reactive stream object:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.bulk.async;
import com.azure.cosmos.*;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.examples.common.Families;
import com.azure.cosmos.examples.common.Family;
// <CosmosBulkOperationsImport>
import com.azure.cosmos.models.*;
// </CosmosBulkOperationsImport>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
public class SampleBulkQuickStartAsync {
private static final Logger logger = LoggerFactory.getLogger(SampleBulkQuickStartAsync.class);
private final String databaseName = "AzureSampleFamilyDB";
private final String containerName = "FamilyContainer";
private CosmosAsyncClient client;
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
public static void main(String[] args) {
SampleBulkQuickStartAsync p = new SampleBulkQuickStartAsync();
try {
logger.info("Starting ASYNC main");
p.getStartedDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown();
}
}
public void close() {
client.close();
}
private void getStartedDemo() {
logger.info("Using Azure Cosmos DB endpoint: " + AccountSettings.HOST);
ArrayList<String> preferredRegions = new ArrayList<>();
preferredRegions.add("West US");
// Setting the preferred location to Cosmos DB Account region
// West US is just an example. User should set preferred location to the Cosmos DB region closest to the
// application
// Create async client
// <CreateAsyncClient>
client = new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.preferredRegions(preferredRegions)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
// </CreateAsyncClient>
createDatabaseIfNotExists();
createContainerIfNotExists();
// <AddDocsToStream>
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
// </AddDocsToStream>
logger.info("Bulk creates.");
bulkCreateItems(families);
bulkCreateItems(families);
andersenFamilyItem.setRegistered(false);
wakefieldFamilyItem.setRegistered(false);
Flux<Family> familiesToUpsert = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk upserts.");
bulkUpsertItems(familiesToUpsert);
andersenFamilyItem.setRegistered(true);
wakefieldFamilyItem.setRegistered(true);
Flux<Family> familiesToReplace = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk replace.");
bulkReplaceItems(familiesToReplace);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk create response processing without errors.");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk create response processing with 409 error");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk creates with execution options.");
bulkCreateItemsWithExecutionOptions(families);
logger.info("Bulk patches.");
// <PatchOperations>
CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
.set("/registered", 0);
// </PatchOperations>
// Note: here we apply "add" and "set" to patch elements whose root parent
// exists, but we cannot do this where the root parent does not exist. When this
// is required, read the full document first, then use replace.
bulkPatchItems(familiesToReplace, patchOps);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk upserts with BulkWriter abstraction");
bulkUpsertItemsWithBulkWriterAbstraction();
logger.info("Bulk upserts with BulkWriter Abstraction and Local Throughput Control");
bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl();
logger.info("Bulk upserts with BulkWriter Abstraction and Global Throughput Control");
bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl();
}
private void createDatabaseIfNotExists() {
logger.info("Create database " + databaseName + " if not exists.");
// Create database if not exists
// <CreateDatabaseIfNotExists>
Mono<CosmosDatabaseResponse> databaseIfNotExists = client.createDatabaseIfNotExists(databaseName);
databaseIfNotExists.flatMap(databaseResponse -> {
database = client.getDatabase(databaseResponse.getProperties().getId());
logger.info("Checking database " + database.getId() + " completed!\n");
return Mono.empty();
}).block();
// </CreateDatabaseIfNotExists>
}
private void createContainerIfNotExists() {
logger.info("Create container " + containerName + " if not exists.");
// Create container if not exists
// <CreateContainerIfNotExists>
CosmosContainerProperties containerProperties = new CosmosContainerProperties(
containerName, "/lastName");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
Mono<CosmosContainerResponse> containerIfNotExists = database
.createContainerIfNotExists(containerProperties, throughputProperties);
// Create container with 400 RU/s
CosmosContainerResponse cosmosContainerResponse = containerIfNotExists.block();
assert(cosmosContainerResponse != null);
assert(cosmosContainerResponse.getProperties() != null);
container = database.getContainer(cosmosContainerResponse.getProperties().getId());
// </CreateContainerIfNotExists>
//Modify existing container
containerProperties = cosmosContainerResponse.getProperties();
Mono<CosmosContainerResponse> propertiesReplace =
container.replace(containerProperties, new CosmosContainerRequestOptions());
propertiesReplace.flatMap(containerResponse -> {
logger.info(
"setupContainer(): Container {}} in {} has been updated with it's new properties.",
container.getId(),
database.getId());
return Mono.empty();
}).onErrorResume((exception) -> {
logger.error(
"setupContainer(): Unable to update properties for container {} in database {}. e: {}",
container.getId(),
database.getId(),
exception.getLocalizedMessage(),
exception);
return Mono.empty();
}).block();
}
// <BulkCreateItems>
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkCreateItems>
// <BulkDeleteItems>
private void bulkDeleteItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkDeleteItems>
// <BulkUpsertItems>
private void bulkUpsertItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkUpsertItems>
// <BulkPatchItems>
private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkPatchItems>
// <BulkReplaceItems>
private void bulkReplaceItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkReplaceItems>
// <BulkCreateItemsWithResponseProcessingAndExecutionOptions>
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
} else if (cosmosBulkItemResponse == null ||
!cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
logger.error(
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " +
"successfully with " + "a" + " {} response code.",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName(),
cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
} else {
logger.info(
"Item ID: [{}] Item PartitionKey Value: [{}]",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName());
logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
}
if (cosmosBulkItemResponse == null) {
return Mono.error(new IllegalStateException("No response retrieved."));
} else {
return Mono.just(cosmosBulkItemResponse);
}
}).blockLast();
}
private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
// The default value for maxMicroBatchConcurrency is 1.
// By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
//
// Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
// When the RU has already been under saturation, increasing the concurrency will not help the situation,
// rather it may cause more 429 and request timeout.
bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
}
// </BulkCreateItemsWithResponseProcessingAndExecutionOptions>
// <BulkWriterAbstraction>
private void bulkUpsertItemsWithBulkWriterAbstraction() {
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group1")
.setTargetThroughput(200)
.build();
container.enableLocalThroughputControlGroup(groupConfig);
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
String controlContainerId = "throughputControlContainer";
CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group-" + UUID.randomUUID())
.setTargetThroughput(200)
.build();
GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
.setControlItemRenewInterval(Duration.ofSeconds(5))
.setControlItemExpireInterval(Duration.ofSeconds(20))
.build();
container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
// </BulkWriterAbstraction>
private void shutdown() {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
//Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null) container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null) database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack " + "trace below.");
err.printStackTrace();
}
client.close();
logger.info("Done.");
}
}
- The sample contains methods for bulk create, upsert, replace, and delete. In each method we map the families documents in the BulkWriter
Flux<Family>
stream to multiple method calls inCosmosBulkOperations
. These operations are added to another reactive stream objectFlux<CosmosItemOperation>
. The stream is then passed to theexecuteBulkOperations
method of the asynccontainer
we created at the beginning, and operations are executed in bulk. See bulk create method below as an example:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.bulk.async;
import com.azure.cosmos.*;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.examples.common.Families;
import com.azure.cosmos.examples.common.Family;
// <CosmosBulkOperationsImport>
import com.azure.cosmos.models.*;
// </CosmosBulkOperationsImport>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
public class SampleBulkQuickStartAsync {
private static final Logger logger = LoggerFactory.getLogger(SampleBulkQuickStartAsync.class);
private final String databaseName = "AzureSampleFamilyDB";
private final String containerName = "FamilyContainer";
private CosmosAsyncClient client;
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
public static void main(String[] args) {
SampleBulkQuickStartAsync p = new SampleBulkQuickStartAsync();
try {
logger.info("Starting ASYNC main");
p.getStartedDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown();
}
}
public void close() {
client.close();
}
private void getStartedDemo() {
logger.info("Using Azure Cosmos DB endpoint: " + AccountSettings.HOST);
ArrayList<String> preferredRegions = new ArrayList<>();
preferredRegions.add("West US");
// Setting the preferred location to Cosmos DB Account region
// West US is just an example. User should set preferred location to the Cosmos DB region closest to the
// application
// Create async client
// <CreateAsyncClient>
client = new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.preferredRegions(preferredRegions)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
// </CreateAsyncClient>
createDatabaseIfNotExists();
createContainerIfNotExists();
// <AddDocsToStream>
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
// </AddDocsToStream>
logger.info("Bulk creates.");
bulkCreateItems(families);
bulkCreateItems(families);
andersenFamilyItem.setRegistered(false);
wakefieldFamilyItem.setRegistered(false);
Flux<Family> familiesToUpsert = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk upserts.");
bulkUpsertItems(familiesToUpsert);
andersenFamilyItem.setRegistered(true);
wakefieldFamilyItem.setRegistered(true);
Flux<Family> familiesToReplace = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk replace.");
bulkReplaceItems(familiesToReplace);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk create response processing without errors.");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk create response processing with 409 error");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk creates with execution options.");
bulkCreateItemsWithExecutionOptions(families);
logger.info("Bulk patches.");
// <PatchOperations>
CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
.set("/registered", 0);
// </PatchOperations>
// Note: here we apply "add" and "set" to patch elements whose root parent
// exists, but we cannot do this where the root parent does not exist. When this
// is required, read the full document first, then use replace.
bulkPatchItems(familiesToReplace, patchOps);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk upserts with BulkWriter abstraction");
bulkUpsertItemsWithBulkWriterAbstraction();
logger.info("Bulk upserts with BulkWriter Abstraction and Local Throughput Control");
bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl();
logger.info("Bulk upserts with BulkWriter Abstraction and Global Throughput Control");
bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl();
}
private void createDatabaseIfNotExists() {
logger.info("Create database " + databaseName + " if not exists.");
// Create database if not exists
// <CreateDatabaseIfNotExists>
Mono<CosmosDatabaseResponse> databaseIfNotExists = client.createDatabaseIfNotExists(databaseName);
databaseIfNotExists.flatMap(databaseResponse -> {
database = client.getDatabase(databaseResponse.getProperties().getId());
logger.info("Checking database " + database.getId() + " completed!\n");
return Mono.empty();
}).block();
// </CreateDatabaseIfNotExists>
}
private void createContainerIfNotExists() {
logger.info("Create container " + containerName + " if not exists.");
// Create container if not exists
// <CreateContainerIfNotExists>
CosmosContainerProperties containerProperties = new CosmosContainerProperties(
containerName, "/lastName");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
Mono<CosmosContainerResponse> containerIfNotExists = database
.createContainerIfNotExists(containerProperties, throughputProperties);
// Create container with 400 RU/s
CosmosContainerResponse cosmosContainerResponse = containerIfNotExists.block();
assert(cosmosContainerResponse != null);
assert(cosmosContainerResponse.getProperties() != null);
container = database.getContainer(cosmosContainerResponse.getProperties().getId());
// </CreateContainerIfNotExists>
//Modify existing container
containerProperties = cosmosContainerResponse.getProperties();
Mono<CosmosContainerResponse> propertiesReplace =
container.replace(containerProperties, new CosmosContainerRequestOptions());
propertiesReplace.flatMap(containerResponse -> {
logger.info(
"setupContainer(): Container {}} in {} has been updated with it's new properties.",
container.getId(),
database.getId());
return Mono.empty();
}).onErrorResume((exception) -> {
logger.error(
"setupContainer(): Unable to update properties for container {} in database {}. e: {}",
container.getId(),
database.getId(),
exception.getLocalizedMessage(),
exception);
return Mono.empty();
}).block();
}
// <BulkCreateItems>
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkCreateItems>
// <BulkDeleteItems>
private void bulkDeleteItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkDeleteItems>
// <BulkUpsertItems>
private void bulkUpsertItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkUpsertItems>
// <BulkPatchItems>
private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkPatchItems>
// <BulkReplaceItems>
private void bulkReplaceItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkReplaceItems>
// <BulkCreateItemsWithResponseProcessingAndExecutionOptions>
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
} else if (cosmosBulkItemResponse == null ||
!cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
logger.error(
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " +
"successfully with " + "a" + " {} response code.",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName(),
cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
} else {
logger.info(
"Item ID: [{}] Item PartitionKey Value: [{}]",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName());
logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
}
if (cosmosBulkItemResponse == null) {
return Mono.error(new IllegalStateException("No response retrieved."));
} else {
return Mono.just(cosmosBulkItemResponse);
}
}).blockLast();
}
private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
// The default value for maxMicroBatchConcurrency is 1.
// By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
//
// Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
// When the RU has already been under saturation, increasing the concurrency will not help the situation,
// rather it may cause more 429 and request timeout.
bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
}
// </BulkCreateItemsWithResponseProcessingAndExecutionOptions>
// <BulkWriterAbstraction>
private void bulkUpsertItemsWithBulkWriterAbstraction() {
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group1")
.setTargetThroughput(200)
.build();
container.enableLocalThroughputControlGroup(groupConfig);
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
String controlContainerId = "throughputControlContainer";
CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group-" + UUID.randomUUID())
.setTargetThroughput(200)
.build();
GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
.setControlItemRenewInterval(Duration.ofSeconds(5))
.setControlItemExpireInterval(Duration.ofSeconds(20))
.build();
container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
// </BulkWriterAbstraction>
private void shutdown() {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
//Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null) container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null) database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack " + "trace below.");
err.printStackTrace();
}
client.close();
logger.info("Done.");
}
}
- There's also a class
BulkWriter.java
in the same directory as the sample application. This class demonstrates how to handle rate limiting (429) and timeout (408) errors that may occur during bulk execution, and retrying those operations effectively. It is implemented in the below methods, also showing how to implement local and multiple-regional throughput control.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.bulk.async;
import com.azure.cosmos.*;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.examples.common.Families;
import com.azure.cosmos.examples.common.Family;
// <CosmosBulkOperationsImport>
import com.azure.cosmos.models.*;
// </CosmosBulkOperationsImport>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
public class SampleBulkQuickStartAsync {
private static final Logger logger = LoggerFactory.getLogger(SampleBulkQuickStartAsync.class);
private final String databaseName = "AzureSampleFamilyDB";
private final String containerName = "FamilyContainer";
private CosmosAsyncClient client;
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
public static void main(String[] args) {
SampleBulkQuickStartAsync p = new SampleBulkQuickStartAsync();
try {
logger.info("Starting ASYNC main");
p.getStartedDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown();
}
}
public void close() {
client.close();
}
private void getStartedDemo() {
logger.info("Using Azure Cosmos DB endpoint: " + AccountSettings.HOST);
ArrayList<String> preferredRegions = new ArrayList<>();
preferredRegions.add("West US");
// Setting the preferred location to Cosmos DB Account region
// West US is just an example. User should set preferred location to the Cosmos DB region closest to the
// application
// Create async client
// <CreateAsyncClient>
client = new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.preferredRegions(preferredRegions)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
// </CreateAsyncClient>
createDatabaseIfNotExists();
createContainerIfNotExists();
// <AddDocsToStream>
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
// </AddDocsToStream>
logger.info("Bulk creates.");
bulkCreateItems(families);
bulkCreateItems(families);
andersenFamilyItem.setRegistered(false);
wakefieldFamilyItem.setRegistered(false);
Flux<Family> familiesToUpsert = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk upserts.");
bulkUpsertItems(familiesToUpsert);
andersenFamilyItem.setRegistered(true);
wakefieldFamilyItem.setRegistered(true);
Flux<Family> familiesToReplace = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk replace.");
bulkReplaceItems(familiesToReplace);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk create response processing without errors.");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk create response processing with 409 error");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk creates with execution options.");
bulkCreateItemsWithExecutionOptions(families);
logger.info("Bulk patches.");
// <PatchOperations>
CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
.set("/registered", 0);
// </PatchOperations>
// Note: here we apply "add" and "set" to patch elements whose root parent
// exists, but we cannot do this where the root parent does not exist. When this
// is required, read the full document first, then use replace.
bulkPatchItems(familiesToReplace, patchOps);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk upserts with BulkWriter abstraction");
bulkUpsertItemsWithBulkWriterAbstraction();
logger.info("Bulk upserts with BulkWriter Abstraction and Local Throughput Control");
bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl();
logger.info("Bulk upserts with BulkWriter Abstraction and Global Throughput Control");
bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl();
}
private void createDatabaseIfNotExists() {
logger.info("Create database " + databaseName + " if not exists.");
// Create database if not exists
// <CreateDatabaseIfNotExists>
Mono<CosmosDatabaseResponse> databaseIfNotExists = client.createDatabaseIfNotExists(databaseName);
databaseIfNotExists.flatMap(databaseResponse -> {
database = client.getDatabase(databaseResponse.getProperties().getId());
logger.info("Checking database " + database.getId() + " completed!\n");
return Mono.empty();
}).block();
// </CreateDatabaseIfNotExists>
}
private void createContainerIfNotExists() {
logger.info("Create container " + containerName + " if not exists.");
// Create container if not exists
// <CreateContainerIfNotExists>
CosmosContainerProperties containerProperties = new CosmosContainerProperties(
containerName, "/lastName");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
Mono<CosmosContainerResponse> containerIfNotExists = database
.createContainerIfNotExists(containerProperties, throughputProperties);
// Create container with 400 RU/s
CosmosContainerResponse cosmosContainerResponse = containerIfNotExists.block();
assert(cosmosContainerResponse != null);
assert(cosmosContainerResponse.getProperties() != null);
container = database.getContainer(cosmosContainerResponse.getProperties().getId());
// </CreateContainerIfNotExists>
//Modify existing container
containerProperties = cosmosContainerResponse.getProperties();
Mono<CosmosContainerResponse> propertiesReplace =
container.replace(containerProperties, new CosmosContainerRequestOptions());
propertiesReplace.flatMap(containerResponse -> {
logger.info(
"setupContainer(): Container {}} in {} has been updated with it's new properties.",
container.getId(),
database.getId());
return Mono.empty();
}).onErrorResume((exception) -> {
logger.error(
"setupContainer(): Unable to update properties for container {} in database {}. e: {}",
container.getId(),
database.getId(),
exception.getLocalizedMessage(),
exception);
return Mono.empty();
}).block();
}
// <BulkCreateItems>
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkCreateItems>
// <BulkDeleteItems>
private void bulkDeleteItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkDeleteItems>
// <BulkUpsertItems>
private void bulkUpsertItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkUpsertItems>
// <BulkPatchItems>
private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkPatchItems>
// <BulkReplaceItems>
private void bulkReplaceItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkReplaceItems>
// <BulkCreateItemsWithResponseProcessingAndExecutionOptions>
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
} else if (cosmosBulkItemResponse == null ||
!cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
logger.error(
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " +
"successfully with " + "a" + " {} response code.",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName(),
cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
} else {
logger.info(
"Item ID: [{}] Item PartitionKey Value: [{}]",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName());
logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
}
if (cosmosBulkItemResponse == null) {
return Mono.error(new IllegalStateException("No response retrieved."));
} else {
return Mono.just(cosmosBulkItemResponse);
}
}).blockLast();
}
private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
// The default value for maxMicroBatchConcurrency is 1.
// By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
//
// Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
// When the RU has already been under saturation, increasing the concurrency will not help the situation,
// rather it may cause more 429 and request timeout.
bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
}
// </BulkCreateItemsWithResponseProcessingAndExecutionOptions>
// <BulkWriterAbstraction>
private void bulkUpsertItemsWithBulkWriterAbstraction() {
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group1")
.setTargetThroughput(200)
.build();
container.enableLocalThroughputControlGroup(groupConfig);
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
String controlContainerId = "throughputControlContainer";
CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group-" + UUID.randomUUID())
.setTargetThroughput(200)
.build();
GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
.setControlItemRenewInterval(Duration.ofSeconds(5))
.setControlItemExpireInterval(Duration.ofSeconds(20))
.build();
container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
// </BulkWriterAbstraction>
private void shutdown() {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
//Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null) container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null) database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack " + "trace below.");
err.printStackTrace();
}
client.close();
logger.info("Done.");
}
}
- Additionally, there are bulk create methods in the sample, which illustrate how to add response processing, and set execution options:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.bulk.async;
import com.azure.cosmos.*;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.examples.common.Families;
import com.azure.cosmos.examples.common.Family;
// <CosmosBulkOperationsImport>
import com.azure.cosmos.models.*;
// </CosmosBulkOperationsImport>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
public class SampleBulkQuickStartAsync {
private static final Logger logger = LoggerFactory.getLogger(SampleBulkQuickStartAsync.class);
private final String databaseName = "AzureSampleFamilyDB";
private final String containerName = "FamilyContainer";
private CosmosAsyncClient client;
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
public static void main(String[] args) {
SampleBulkQuickStartAsync p = new SampleBulkQuickStartAsync();
try {
logger.info("Starting ASYNC main");
p.getStartedDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown();
}
}
public void close() {
client.close();
}
private void getStartedDemo() {
logger.info("Using Azure Cosmos DB endpoint: " + AccountSettings.HOST);
ArrayList<String> preferredRegions = new ArrayList<>();
preferredRegions.add("West US");
// Setting the preferred location to Cosmos DB Account region
// West US is just an example. User should set preferred location to the Cosmos DB region closest to the
// application
// Create async client
// <CreateAsyncClient>
client = new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.preferredRegions(preferredRegions)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
// </CreateAsyncClient>
createDatabaseIfNotExists();
createContainerIfNotExists();
// <AddDocsToStream>
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
// </AddDocsToStream>
logger.info("Bulk creates.");
bulkCreateItems(families);
bulkCreateItems(families);
andersenFamilyItem.setRegistered(false);
wakefieldFamilyItem.setRegistered(false);
Flux<Family> familiesToUpsert = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk upserts.");
bulkUpsertItems(familiesToUpsert);
andersenFamilyItem.setRegistered(true);
wakefieldFamilyItem.setRegistered(true);
Flux<Family> familiesToReplace = Flux.just(
andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
logger.info("Bulk replace.");
bulkReplaceItems(familiesToReplace);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk create response processing without errors.");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk create response processing with 409 error");
bulkCreateItemsWithResponseProcessing(families);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk creates with execution options.");
bulkCreateItemsWithExecutionOptions(families);
logger.info("Bulk patches.");
// <PatchOperations>
CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
.set("/registered", 0);
// </PatchOperations>
// Note: here we apply "add" and "set" to patch elements whose root parent
// exists, but we cannot do this where the root parent does not exist. When this
// is required, read the full document first, then use replace.
bulkPatchItems(familiesToReplace, patchOps);
logger.info("Bulk deletes.");
bulkDeleteItems(families);
logger.info("Bulk upserts with BulkWriter abstraction");
bulkUpsertItemsWithBulkWriterAbstraction();
logger.info("Bulk upserts with BulkWriter Abstraction and Local Throughput Control");
bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl();
logger.info("Bulk upserts with BulkWriter Abstraction and Global Throughput Control");
bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl();
}
private void createDatabaseIfNotExists() {
logger.info("Create database " + databaseName + " if not exists.");
// Create database if not exists
// <CreateDatabaseIfNotExists>
Mono<CosmosDatabaseResponse> databaseIfNotExists = client.createDatabaseIfNotExists(databaseName);
databaseIfNotExists.flatMap(databaseResponse -> {
database = client.getDatabase(databaseResponse.getProperties().getId());
logger.info("Checking database " + database.getId() + " completed!\n");
return Mono.empty();
}).block();
// </CreateDatabaseIfNotExists>
}
private void createContainerIfNotExists() {
logger.info("Create container " + containerName + " if not exists.");
// Create container if not exists
// <CreateContainerIfNotExists>
CosmosContainerProperties containerProperties = new CosmosContainerProperties(
containerName, "/lastName");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
Mono<CosmosContainerResponse> containerIfNotExists = database
.createContainerIfNotExists(containerProperties, throughputProperties);
// Create container with 400 RU/s
CosmosContainerResponse cosmosContainerResponse = containerIfNotExists.block();
assert(cosmosContainerResponse != null);
assert(cosmosContainerResponse.getProperties() != null);
container = database.getContainer(cosmosContainerResponse.getProperties().getId());
// </CreateContainerIfNotExists>
//Modify existing container
containerProperties = cosmosContainerResponse.getProperties();
Mono<CosmosContainerResponse> propertiesReplace =
container.replace(containerProperties, new CosmosContainerRequestOptions());
propertiesReplace.flatMap(containerResponse -> {
logger.info(
"setupContainer(): Container {}} in {} has been updated with it's new properties.",
container.getId(),
database.getId());
return Mono.empty();
}).onErrorResume((exception) -> {
logger.error(
"setupContainer(): Unable to update properties for container {} in database {}. e: {}",
container.getId(),
database.getId(),
exception.getLocalizedMessage(),
exception);
return Mono.empty();
}).block();
}
// <BulkCreateItems>
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkCreateItems>
// <BulkDeleteItems>
private void bulkDeleteItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkDeleteItems>
// <BulkUpsertItems>
private void bulkUpsertItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkUpsertItems>
// <BulkPatchItems>
private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkPatchItems>
// <BulkReplaceItems>
private void bulkReplaceItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
// </BulkReplaceItems>
// <BulkCreateItemsWithResponseProcessingAndExecutionOptions>
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
} else if (cosmosBulkItemResponse == null ||
!cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
logger.error(
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " +
"successfully with " + "a" + " {} response code.",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName(),
cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
} else {
logger.info(
"Item ID: [{}] Item PartitionKey Value: [{}]",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName());
logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
}
if (cosmosBulkItemResponse == null) {
return Mono.error(new IllegalStateException("No response retrieved."));
} else {
return Mono.just(cosmosBulkItemResponse);
}
}).blockLast();
}
private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
// The default value for maxMicroBatchConcurrency is 1.
// By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
//
// Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
// When the RU has already been under saturation, increasing the concurrency will not help the situation,
// rather it may cause more 429 and request timeout.
bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
}
// </BulkCreateItemsWithResponseProcessingAndExecutionOptions>
// <BulkWriterAbstraction>
private void bulkUpsertItemsWithBulkWriterAbstraction() {
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group1")
.setTargetThroughput(200)
.build();
container.enableLocalThroughputControlGroup(groupConfig);
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
String controlContainerId = "throughputControlContainer";
CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group-" + UUID.randomUUID())
.setTargetThroughput(200)
.build();
GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
.setControlItemRenewInterval(Duration.ofSeconds(5))
.setControlItemExpireInterval(Duration.ofSeconds(20))
.build();
container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
// </BulkWriterAbstraction>
private void shutdown() {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
//Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null) container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null) database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack " + "trace below.");
err.printStackTrace();
}
client.close();
logger.info("Done.");
}
}
Performance tips
Consider the following points for better performance when using bulk executor library:
For best performance, run your application from an Azure VM in the same region as your Azure Cosmos DB account write region.
For achieving higher throughput:
- Set the JVM's heap size to a large enough number to avoid any memory issue in handling large number of documents. Suggested heap size: max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
- There's a preprocessing time, due to which you'll get higher throughput when performing bulk operations with a large number of documents. So, if you want to import 10,000,000 documents, running bulk import 10 times on 10 bulk of documents each of size 1,000,000 is preferable than running bulk import 100 times on 100 bulk of documents each of size 100,000 documents.
It is recommended to instantiate a single CosmosAsyncClient object for the entire application within a single virtual machine that corresponds to a specific Azure Cosmos DB container.
Since a single bulk operation API execution consumes a large chunk of the client machine's CPU and network IO. This happens by spawning multiple tasks internally, avoid spawning multiple concurrent tasks within your application process each executing bulk operation API calls. If a single bulk operation API calls running on a single virtual machine is unable to consume your entire container's throughput (if your container's throughput > 1 million RU/s), it's preferable to create separate virtual machines to concurrently execute bulk operation API calls.
Next steps
- For an overview of bulk executor functionality, see bulk executor overview.