使用 Bulk Executor Java 库针对 Azure Cosmos DB 数据执行批量操作Use bulk executor Java library to perform bulk operations on Azure Cosmos DB data

本教程说明了如何使用 Azure Cosmos DB 的批量执行程序 Java 库导入和更新 Azure Cosmos DB 文档。This tutorial provides instructions on using the Azure Cosmos DB's bulk executor Java library to import, and update Azure Cosmos DB documents. 若要了解 Bulk Executor 库及它如何帮助你利用大量吞吐量和存储,请参阅 Bulk Executor 库概述一文。To learn about bulk executor library and how it helps you leverage massive throughput and storage, see bulk executor Library overview article. 在本教程中,我们将构建一个可生成随机文档的 Java 应用程序,然后将文档批量导入 Azure Cosmos 容器。In this tutorial, you build a Java application that generates random documents and they are bulk imported into an Azure Cosmos container. 导入后,我们将批量更新文档的某些属性。After importing, you will bulk update some properties of a document.

目前,批量执行程序库仅受 Azure Cosmos DB SQL API 和 Gremlin API 帐户支持。Currently, the bulk executor library is supported only by Azure Cosmos DB SQL API and Gremlin API accounts. 本文介绍如何配合使用 SQL API 帐户和批量执行程序 Java 库。This article describes how to use bulk executor Java library with SQL API accounts. 若要了解如何配合使用 Gremlin API 和批量执行程序 .NET 库,请参阅在 Azure Cosmos DB Gremlin API 中执行批量操作To learn about using bulk executor .NET library with Gremlin API, see perform bulk operations in Azure Cosmos DB Gremlin API. 所述的批量执行工具库仅适用于 Azure Cosmos DB Java sync SDK v2,它是目前推荐的 Java 批量支持解决方案。The bulk executor library described is available is only available for the Azure Cosmos DB Java sync SDK v2 and it is the current recommended solution for Java bulk support. 它目前不适用于 3.x、4.x 或更高版本的 SDK。It is currently not available for the 3.x, 4.x or other higher SDK versions.

先决条件Prerequisites

克隆示例应用程序Clone the sample application

现在,我们从 GitHub 下载示例 Java 应用程序来接着处理代码。Now let's switch to working with code by downloading a sample Java application from GitHub. 此应用程序针对 Azure Cosmos DB 数据执行批量操作。This application performs bulk operations on Azure Cosmos DB data. 若要克隆该应用程序,请打开命令提示符,导航到要将该应用程序复制到的目录,然后运行以下命令:To clone the application, open a command prompt, navigate to the directory where you want to copy the application and run the following command:

 git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-java-getting-started.git 

克隆的存储库包含相对于“\azure-cosmosdb-bulkexecutor-java-getting-started\samples\bulkexecutor-sample\src\main\java\com\microsoft\azure\cosmosdb\bulkexecutor”文件夹的两个示例:“bulkimport”和“bulkupdate”。The cloned repository contains two samples "bulkimport" and "bulkupdate" relative to the "\azure-cosmosdb-bulkexecutor-java-getting-started\samples\bulkexecutor-sample\src\main\java\com\microsoft\azure\cosmosdb\bulkexecutor" folder. “bulkimport”应用程序生成随机文档并将其导入 Azure Cosmos DB。The "bulkimport" application generates random documents and imports them to Azure Cosmos DB. “bulkupdate”应用程序更新 Azure Cosmos DB 中的某些文档。The "bulkupdate" application updates some documents in Azure Cosmos DB. 在后续部分,我们将查看其中每个示例应用中的代码。In the next sections, we will review the code in each of these sample apps.

将数据批量导入 Azure Cosmos DBBulk import data to Azure Cosmos DB

  1. Azure Cosmos DB 的连接字符串将作为参数读取,并分配到 CmdLineConfiguration.java 文件中定义的变量。The Azure Cosmos DB's connection strings are read as arguments and assigned to variables defined in CmdLineConfiguration.java file.

  2. 接下来,使用以下语句初始化 DocumentClient 对象:Next the DocumentClient object is initialized by using the following statements:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy();
    connectionPolicy.setMaxPoolSize(1000);
    DocumentClient client = new DocumentClient(
      HOST,
      MASTER_KEY, 
      connectionPolicy,
      ConsistencyLevel.Session)
    
  3. 根据等待时间和限制请求使用较大重试值初始化 DocumentBulkExecutor 对象。The DocumentBulkExecutor object is initialized with a high retry value for wait time and throttled requests. 然后,这些值将设置为 0,以将阻塞控制权传递给 DocumentBulkExecutor(在其生存期内都会保留此控制权)。And then they are set to 0 to pass congestion control to DocumentBulkExecutor for its lifetime.

    // Set client's retry options high for initialization
    client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(30);
    client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9);
    
    // Builder pattern
    Builder bulkExecutorBuilder = DocumentBulkExecutor.builder().from(
      client,
      DATABASE_NAME,
      COLLECTION_NAME,
      collection.getPartitionKey(),
      offerThroughput) // throughput you want to allocate for bulk import out of the container's total throughput
    
    // Instantiate DocumentBulkExecutor
    DocumentBulkExecutor bulkExecutor = bulkExecutorBuilder.build()
    
    // Set retries to 0 to pass complete control to bulk executor
    client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0);
    client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
    
  4. 调用 importAll API,以便生成要批量导入 Azure Cosmos 容器的随机文档。Call the importAll API that generates random documents to bulk import into an Azure Cosmos container. 可以在 CmdLineConfiguration.java 文件中配置命令行配置。You can configure the command-line configurations within the CmdLineConfiguration.java file.

    BulkImportResponse bulkImportResponse = bulkExecutor.importAll(documents, false, true, null);
    

    批量导入 API 接受 JSON 序列化文档的集合并使用以下语法,请参阅 API 文档以获取详细信息:The bulk import API accepts a collection of JSON-serialized documents and it has the following syntax, for more information, see the API documentation:

    public BulkImportResponse importAll(
        Collection<String> documents,
        boolean isUpsert,
        boolean disableAutomaticIdGeneration,
        Integer maxConcurrencyPerPartitionRange) throws DocumentClientException;   
    

    importAll 方法接受以下参数:The importAll method accepts the following parameters:

    参数Parameter 说明Description
    isUpsertisUpsert 用于启用文档更新插入的标志。A flag to enable upsert of the documents. 如果已存在具有给定 ID 的文档,则会更新该文档。If a document with given ID already exists, it's updated.
    disableAutomaticIdGenerationdisableAutomaticIdGeneration 用于禁用自动生成 ID 的标志。A flag to disable automatic generation of ID. 此值默认设置为 true。By default, it is set to true.
    maxConcurrencyPerPartitionRangemaxConcurrencyPerPartitionRange 每个分区键范围的最大并发度。The maximum degree of concurrency per partition key range. 默认值为 20。The default value is 20.

    批量导入响应对象定义批量导入 API 调用的结果包含以下 get 方法:Bulk import response object definition The result of the bulk import API call contains the following get methods:

    参数Parameter 说明Description
    int getNumberOfDocumentsImported()int getNumberOfDocumentsImported() 从提供给批量导入 API 调用的文档中成功导入的文档总数。The total number of documents that were successfully imported out of the documents supplied to the bulk import API call.
    double getTotalRequestUnitsConsumed()double getTotalRequestUnitsConsumed() 批量导入 API 调用消耗的请求单位 (RU) 总数。The total request units (RU) consumed by the bulk import API call.
    Duration getTotalTimeTaken()Duration getTotalTimeTaken() 批量导入 API 调用完成执行所花费的总时间。The total time taken by the bulk import API call to complete execution.
    List<Exception> getErrors()List<Exception> getErrors() 如果分批提供给批量导入 API 调用的某些文档无法插入,则获取错误列表。Gets the list of errors if some documents out of the batch supplied to the bulk import API call failed to get inserted.
    List<Object> getBadInputDocuments()List<Object> getBadInputDocuments() 未在批量导入 API 调用中成功导入的格式不当文档列表。The list of bad-format documents that were not successfully imported in the bulk import API call. 用户应修复返回的文档,并重试导入。User should fix the documents returned and retry import. 格式不当的文档包括其 ID 值不是字符串(null 或其他任何数据类型被视为无效)的文档。Bad-formatted documents include documents whose ID value is not a string (null or any other datatype is considered invalid).
  5. 准备好批量导入应用程序后,请使用“mvn clean package”命令从源代码生成命令行工具。After you have the bulk import application ready, build the command-line tool from source by using the 'mvn clean package' command. 此命令在目标文件夹中生成一个 jar 文件:This command generates a jar file in the target folder:

    mvn clean package
    
  6. 生成目标依赖关系后,可使用以下命令调用批量导入程序应用程序:After the target dependencies are generated, you can invoke the bulk importer application by using the following command:

    java -Xmx12G -jar bulkexecutor-sample-1.0-SNAPSHOT-jar-with-dependencies.jar -serviceEndpoint *<Fill in your Azure Cosmos DB's endpoint>* -masterKey *<Fill in your Azure Cosmos DB's primary key>* -databaseId bulkImportDb -collectionId bulkImportColl -operation import -shouldCreateCollection -collectionThroughput 1000000 -partitionKey /profileid -maxConnectionPoolSize 6000 -numberOfDocumentsForEachCheckpoint 1000000 -numberOfCheckpoints 10
    

    批量导入程序会创建新的数据库和集合,并在 App.config 文件中指定数据库名称、集合名称与吞吐量值。The bulk importer creates a new database and a collection with the database name, collection name, and throughput values specified in the App.config file.

在 Azure Cosmos DB 中批量更新数据Bulk update data in Azure Cosmos DB

可以使用 BulkUpdateAsync API 更新现有文档。You can update existing documents by using the BulkUpdateAsync API. 此示例将 Name 字段设置为新值,并从现有文档中删除 Description 字段。In this example, you will set the Name field to a new value and remove the Description field from the existing documents. 有关完整的受支持字段更新操作集,请参阅 API 文档For the full set of supported field update operations, see API documentation.

  1. 定义更新项以及相应的字段更新操作。Defines the update items along with corresponding field update operations. 此示例使用 SetUpdateOperation 更新 Name 字段,并使用 UnsetUpdateOperation 删除所有文档中的 Description 字段。In this example, you will use SetUpdateOperation to update the Name field and UnsetUpdateOperation to remove the Description field from all the documents. 还可以执行其他操作,例如,根据特定的值递增文档字段、将特定的值推送到数组字段,或者从数组字段中删除特定的值。You can also perform other operations like increment a document field by a specific value, push specific values into an array field, or remove a specific value from an array field. 若要了解批量更新 API 提供的不同方法,请参阅 API 文档To learn about different methods provided by the bulk update API, see the API documentation.

    SetUpdateOperation<String> nameUpdate = new SetUpdateOperation<>("Name","UpdatedDocValue");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    ArrayList<UpdateOperationBase> updateOperations = new ArrayList<>();
    updateOperations.add(nameUpdate);
    updateOperations.add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new ArrayList<>(cfg.getNumberOfDocumentsForEachCheckpoint());
    IntStream.range(0, cfg.getNumberOfDocumentsForEachCheckpoint()).mapToObj(j -> {                     
     return new UpdateItem(Long.toString(prefix + j), Long.toString(prefix + j), updateOperations);
     }).collect(Collectors.toCollection(() -> updateItems));
    
  2. 调用 updateAll API,以便生成随后要批量导入 Azure Cosmos 容器的随机文档。Call the updateAll API that generates random documents to be then bulk imported into an Azure Cosmos container. 可以在 CmdLineConfiguration.java 文件中配置要传递的命令行配置。You can configure the command-line configurations to be passed in CmdLineConfiguration.java file.

    BulkUpdateResponse bulkUpdateResponse = bulkExecutor.updateAll(updateItems, null)
    

    批量更新 API 接受一系列可更新的项。The bulk update API accepts a collection of items to be updated. 每个更新项指定要针对 ID 和分区键值标识的文档执行的字段更新操作列表。Each update item specifies the list of field update operations to be performed on a document identified by an ID and a partition key value. 有关更多详细信息,请参阅 API 文档for more details, see the API documentation:

    public BulkUpdateResponse updateAll(
        Collection<UpdateItem> updateItems,
        Integer maxConcurrencyPerPartitionRange) throws DocumentClientException;
    

    updateAll 方法接受以下参数:The updateAll method accepts the following parameters:

    参数Parameter 说明Description
    maxConcurrencyPerPartitionRangemaxConcurrencyPerPartitionRange 每个分区键范围的最大并发度。The maximum degree of concurrency per partition key range. 默认值为 20。The default value is 20.

    批量导入响应对象定义批量导入 API 调用的结果包含以下 get 方法:Bulk import response object definition The result of the bulk import API call contains the following get methods:

    参数Parameter 说明Description
    int getNumberOfDocumentsUpdated()int getNumberOfDocumentsUpdated() 从提供给批量更新 API 调用的文档中成功更新的文档总数。The total number of documents that were successfully updated out of the documents supplied to the bulk update API call.
    double getTotalRequestUnitsConsumed()double getTotalRequestUnitsConsumed() 批量更新 API 调用消耗的请求单位 (RU) 总数。The total request units (RU) consumed by the bulk update API call.
    Duration getTotalTimeTaken()Duration getTotalTimeTaken() 批量更新 API 调用完成执行所花费的总时间。The total time taken by the bulk update API call to complete execution.
    List<Exception> getErrors()List<Exception> getErrors() 获取与更新操作相关的操作或网络问题列表。Gets the list of operational or networking issues related to the update operation.
    List<BulkUpdateFailure> getFailedUpdates()List<BulkUpdateFailure> getFailedUpdates() 获取无法完成的更新列表以及导致失败的特定异常。Gets the list of updates which could not be completed along with the specific exceptions leading to the failures.
  3. 准备好批量更新应用程序后,请使用“mvn clean package”命令从源代码生成命令行工具。After you have the bulk update application ready, build the command-line tool from source by using the 'mvn clean package' command. 此命令在目标文件夹中生成一个 jar 文件:This command generates a jar file in the target folder:

    mvn clean package
    
  4. 生成目标依赖关系后,可使用以下命令调用批量更新应用程序:After the target dependencies are generated, you can invoke the bulk update application by using the following command:

    java -Xmx12G -jar bulkexecutor-sample-1.0-SNAPSHOT-jar-with-dependencies.jar -serviceEndpoint **<Fill in your Azure Cosmos DB's endpoint>* -masterKey **<Fill in your Azure Cosmos DB's primary key>* -databaseId bulkUpdateDb -collectionId bulkUpdateColl -operation update -collectionThroughput 1000000 -partitionKey /profileid -maxConnectionPoolSize 6000 -numberOfDocumentsForEachCheckpoint 1000000 -numberOfCheckpoints 10
    

性能提示Performance tips

使用 Bulk Executor 库时,请注意以下几点,以获得更好的性能:Consider the following points for better performance when using bulk executor library:

  • 为获得最佳性能,请从 Cosmos DB 帐户写入区域中的 Azure VM 运行应用程序。For best performance, run your application from an Azure VM in the same region as your Cosmos DB account write region.

  • 为实现更高的吞吐量:For achieving higher throughput:

    • 请将 JVM 的堆大小设为足够大的数字,以免在处理大量文档时出现任何内存问题。Set the JVM's heap size to a large enough number to avoid any memory issue in handling large number of documents. 建议的堆大小:max(3GB, 3 * sizeof(在一个批中传递给批量导入 API 的文档总数))。Suggested heap size: max(3GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
    • 会有一段预处理时间,因此,在对大量的文档执行批量操作时可以获得更高的吞吐量。There is a preprocessing time, due to which you will get higher throughput when performing bulk operations with a large number of documents. 如果想要导入 10,000,000 个文档,针对 10 批文档(每个批的大小为 1,000,000)运行批量导入 10 次,比针对 100 批文档(每个批的大小为 100,000 个文档)运行批量导入 100 次会更有利。So, if you want to import 10,000,000 documents, running bulk import 10 times on 10 bulk of documents each of size 1,000,000 is preferable than running bulk import 100 times on 100 bulk of documents each of size 100,000 documents.
  • 建议在单个虚拟机中,为整个应用程序实例化对应于特定 Azure Cosmos 容器的单个 DocumentBulkExecutor 对象。It is recommended to instantiate a single DocumentBulkExecutor object for the entire application within a single virtual machine that corresponds to a specific Azure Cosmos container.

  • 原因是单个批量操作 API 执行会消耗客户端计算机的大量 CPU 和网络 IO。Since a single bulk operation API execution consumes a large chunk of the client machine's CPU and network IO. 而发生这种情况的原因是在内部生成了多个任务,因此,每次执行批量操作 API 调用时,请避免在应用程序进程中生成多个并发任务。This happens by spawning multiple tasks internally, avoid spawning multiple concurrent tasks within your application process each executing bulk operation API calls. 如果单个虚拟机上运行的单个批量操作 API 调用无法占用整个容器的吞吐量(如果容器吞吐量超过 100 万 RU/秒),最好是创建独立的虚拟机来并发执行批量操作 API 调用。If a single bulk operation API call running on a single virtual machine is unable to consume your entire container's throughput (if your container's throughput > 1 million RU/s), it's preferable to create separate virtual machines to concurrently execute bulk operation API calls.

后续步骤Next steps