使用批量执行程序 .NET 库在 Azure Cosmos DB 中执行批量操作Use the bulk executor .NET library to perform bulk operations in Azure Cosmos DB

适用于: SQL API

备注

本文介绍的这个批量执行工具库是为使用 .NET SDK 2.x 版本的应用程序保留的。This bulk executor library described in this article is maintained for applications using the .NET SDK 2.x version. 对于新应用程序,可以使用 .NET SDK 版本 3.x 直接提供的批量支持 ,它不需要任何外部库。For new applications, you can use the bulk support that is directly available with the .NET SDK version 3.x and it does not require any external library.

如果你当前正在使用批量执行工具库,并计划迁移到更新 SDK 上的批量支持,请使用迁移指南中的步骤来迁移应用程序。If you are currently using the bulk executor library and planning to migrate to bulk support on the newer SDK, use the steps in the Migration guide to migrate your application.

本教程提供有关使用批量执行程序 .NET 库在 Azure Cosmos 容器中导入和更新文档的说明。This tutorial provides instructions on using the bulk executor .NET library to import and update documents to an Azure Cosmos container. 若要了解批量执行程序库及它如何帮助你利用大量吞吐量和存储,请参阅批量执行程序库概述一文。To learn about the bulk executor library and how it helps you leverage massive throughput and storage, see the bulk executor library overview article. 本教程将讲解一个示例 .NET 应用程序,该应用程序可将随机生成的文档批量导入 Azure Cosmos 容器。In this tutorial, you will see a sample .NET application that bulk imports randomly generated documents into an Azure Cosmos container. 导入之后,它会显示如何通过指定要对特定文档字段执行的修补操作,来批量更新导入的数据。After importing, it shows you how you can bulk update the imported data by specifying patches as operations to perform on specific document fields.

目前,批量执行程序库仅受 Azure Cosmos DB SQL API 和 Gremlin API 帐户支持。Currently, bulk executor library is supported by the Azure Cosmos DB SQL API and Gremlin API accounts only. 本文介绍如何配合使用 SQL API 帐户和批量执行程序 .NET 库。This article describes how to use the bulk executor .NET library with SQL API accounts. 若要了解如何配合使用 Gremlin API 帐户和批量执行程序 .NET 库,请参阅在 Azure Cosmos DB Gremlin API 中执行批量操作To learn about using the bulk executor .NET library with Gremlin API accounts, see perform bulk operations in the Azure Cosmos DB Gremlin API.

先决条件Prerequisites

克隆示例应用程序Clone the sample application

现在,我们从 GitHub 下载示例 .NET 应用程序来接着处理代码。Now let's switch to working with code by downloading a sample .NET application from GitHub. 此应用程序针对 Azure Cosmos 帐户中存储的数据执行批量操作。This application performs bulk operations on the data stored in your Azure Cosmos account. 若要克隆该应用程序,请打开命令提示符,导航到要将该应用程序复制到的目录,然后运行以下命令:To clone the application, open a command prompt, navigate to the directory where you want to copy it and run the following command:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

克隆的存储库包含两个示例:“BulkImportSample”和“BulkUpdateSample”。The cloned repository contains two samples "BulkImportSample" and "BulkUpdateSample". 可以打开任一示例应用程序,使用 Azure Cosmos DB 帐户的连接字符串更新 App.config 文件中的连接字符串,生成解决方案,然后运行它。You can open either of the sample applications, update the connection strings in App.config file with your Azure Cosmos DB account's connection strings, build the solution, and run it.

“BulkImportSample”应用程序生成随机文档并将其批量导入 Azure Cosmos 帐户。The "BulkImportSample" application generates random documents and bulk imports them to your Azure Cosmos account. “BulkUpdateSample”应用程序通过指定要对特定文档字段执行的修补操作,来批量更新导入的文档。The "BulkUpdateSample" application bulk updates the imported documents by specifying patches as operations to perform on specific document fields. 在后续部分,我们将查看其中每个示例应用中的代码。In the next sections, you will review the code in each of these sample apps.

将数据批量导入到 Azure Cosmos 帐户Bulk import data to an Azure Cosmos account

  1. 导航到“BulkImportSample”文件夹并打开“BulkImportSample.sln”文件。Navigate to the "BulkImportSample" folder and open the "BulkImportSample.sln" file.

  2. 从 App.config 文件检索 Azure Cosmos DB 的连接字符串,如以下代码所示:The Azure Cosmos DB's connection strings are retrieved from the App.config file as shown in the following code:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    批量导入程序会创建新的数据库和容器,并在 App.config 文件中指定数据库名称、容器名称与吞吐量值。The bulk importer creates a new database and a container with the database name, container name, and the throughput values specified in the App.config file.

  3. 接下来,使用直接 TCP 连接模式初始化 DocumentClient 对象:Next the DocumentClient object is initialized with Direct TCP connection mode:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
      ConnectionMode = ConnectionMode.Direct,
      ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. 根据等待时间和限制请求使用较大重试值初始化 BulkExecutor 对象。The BulkExecutor object is initialized with a high retry value for wait time and throttled requests. 然后,这些值将设置为 0,以将阻塞控制权传递给 BulkExecutor(在其生存期内都会保留此控制权)。And then they are set to 0 to pass congestion control to BulkExecutor for its lifetime.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. 应用程序调用 BulkImportAsync API。The application invokes the BulkImportAsync API. .NET 库提供批量导入 API 的两个重载 - 一个重载接受序列化的 JSON 文档列表,另一个重载接受反序列化的 POCO 文档列表。The .NET library provides two overloads of the bulk import API - one that accepts a list of serialized JSON documents and the other that accepts a list of deserialized POCO documents. 若要详细了解其中每个重载方法的定义,请参阅 API 文档To learn more about the definitions of each of these overloaded methods, refer to the API documentation.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
     documents: documentsToImportInBatch,
     enableUpsert: true,
     disableAutomaticIdGeneration: true,
     maxConcurrencyPerPartitionKeyRange: null,
     maxInMemorySortingBatchSize: null,
     cancellationToken: token);
    

    BulkImportAsync 方法接受以下参数:BulkImportAsync method accepts the following parameters:

    参数Parameter 说明Description
    enableUpsertenableUpsert 用于对文档启用更新插入操作的标志。A flag to enable upsert operations on the documents. 如果已存在具有给定 ID 的文档,则会更新该文档。If a document with the given ID already exists, it's updated. 此值默认设置为 false。By default, it is set to false.
    disableAutomaticIdGenerationdisableAutomaticIdGeneration 用于禁用自动生成 ID 的标志。A flag to disable automatic generation of ID. 此值默认设置为 true。By default, it is set to true.
    maxConcurrencyPerPartitionKeyRangemaxConcurrencyPerPartitionKeyRange 每个分区键范围的最大并发度,设置为 null 会导致库使用默认值 20。The maximum degree of concurrency per partition key range, setting to null will cause library to use a default value of 20.
    maxInMemorySortingBatchSizemaxInMemorySortingBatchSize 从在每个阶段中传递给 API 调用的文档枚举器提取的最大文档数。The maximum number of documents that are pulled from the document enumerator, which is passed to the API call in each stage. 对于在批量导入之前发生的内存中排序阶段,将此参数设置为 null 会导致库使用默认最小值 (documents.count, 1000000)。For in-memory sorting phase that happens before bulk importing, setting this parameter to null will cause library to use default minimum value (documents.count, 1000000).
    cancellationTokencancellationToken 用于正常退出批量导入操作的取消令牌。The cancellation token to gracefully exit the bulk import operation.

    批量导入响应对象定义 批量导入 API 调用的结果包含以下属性:Bulk import response object definition The result of the bulk import API call contains the following attributes:

    参数Parameter 说明Description
    NumberOfDocumentsImported (long)NumberOfDocumentsImported (long) 从提供给批量导入 API 调用的总文档数中成功导入的文档总数。The total number of documents that were successfully imported out of the total documents supplied to the bulk import API call.
    TotalRequestUnitsConsumed (double)TotalRequestUnitsConsumed (double) 批量导入 API 调用消耗的请求单位 (RU) 总数。The total request units (RU) consumed by the bulk import API call.
    TotalTimeTaken (TimeSpan)TotalTimeTaken (TimeSpan) 批量导入 API 调用完成执行所花费的总时间。The total time taken by the bulk import API call to complete the execution.
    BadInputDocuments (List<object>)BadInputDocuments (List<object>) 未在批量导入 API 调用中成功导入的格式不当文档列表。The list of bad-format documents that were not successfully imported in the bulk import API call. 修复返回的文档,并重试导入。Fix the documents returned and retry import. 格式不当的文档包括其 ID 值不是字符串(null 或其他任何数据类型被视为无效)的文档。Bad-formatted documents include documents whose ID value is not a string (null or any other datatype is considered invalid).

批量更新 Azure Cosmos 帐户中的数据Bulk update data in your Azure Cosmos account

可以使用 BulkUpdateAsync API 更新现有文档。You can update existing documents by using the BulkUpdateAsync API. 此示例将 Name 字段设置为新值,并从现有文档中删除 Description 字段。In this example, you will set the Name field to a new value and remove the Description field from the existing documents. 有关完整的受支持更新操作集,请参阅 API 文档For the full set of supported update operations, refer to the API documentation.

  1. 导航到“BulkUpdateSample”文件夹并打开“BulkUpdateSample.sln”文件。Navigate to the "BulkUpdateSample" folder and open the "BulkUpdateSample.sln" file.

  2. 定义更新项以及相应的字段更新操作。Define the update items along with the corresponding field update operations. 此示例使用 SetUpdateOperation 更新 Name 字段,并使用 UnsetUpdateOperation 从所有文档中删除 Description 字段。In this example, you will use SetUpdateOperation to update the Name field and UnsetUpdateOperation to remove the Description field from all the documents. 还可以执行其他操作,例如,根据特定的值递增文档字段、将特定的值推送到数组字段,或者从数组字段中删除特定的值。You can also perform other operations like increment a document field by a specific value, push specific values into an array field, or remove a specific value from an array field. 若要了解批量更新 API 提供的不同方法,请参阅 API 文档To learn about different methods provided by the bulk update API, refer to the API documentation.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
    updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. 应用程序调用 BulkUpdateAsync API。The application invokes the BulkUpdateAsync API. 若要了解 BulkUpdateAsync 方法的定义,请参阅 API 文档To learn about the definition of the BulkUpdateAsync method, refer to the API documentation.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    BulkUpdateAsync 方法接受以下参数:BulkUpdateAsync method accepts the following parameters:

    参数Parameter 说明Description
    maxConcurrencyPerPartitionKeyRangemaxConcurrencyPerPartitionKeyRange 每个分区键范围的最大并发度,将此参数设置为 null 会让库使用默认值 (20)。The maximum degree of concurrency per partition key range, setting this parameter to null will make the library to use the default value(20).
    maxInMemorySortingBatchSizemaxInMemorySortingBatchSize 从在每个阶段中传递给 API 调用的更新项枚举器提取的最大更新项数。The maximum number of update items pulled from the update items enumerator passed to the API call in each stage. 对于批量导入之前发生的每个内存中排序阶段,将此参数设置为 null 会导致库使用默认最小值 (updateItems.count, 1000000)。For the in-memory sorting phase that happens before bulk updating, setting this parameter to null will cause the library to use the default minimum value(updateItems.count, 1000000).
    cancellationTokencancellationToken 用于正常退出批量更新操作的取消令牌。The cancellation token to gracefully exit the bulk update operation.

    批量更新响应对象定义 批量更新 API 调用的结果包含以下属性:Bulk update response object definition The result of the bulk update API call contains the following attributes:

    参数Parameter 说明Description
    NumberOfDocumentsUpdated (long)NumberOfDocumentsUpdated (long) 从提供给批量更新 API 调用的总文档数中成功更新的文档数。The number of documents that were successfully updated out of the total documents supplied to the bulk update API call.
    TotalRequestUnitsConsumed (double)TotalRequestUnitsConsumed (double) 批量更新 API 调用消耗的请求单位 (RU) 总数。The total request units (RUs) consumed by the bulk update API call.
    TotalTimeTaken (TimeSpan)TotalTimeTaken (TimeSpan) 批量更新 API 调用完成执行所花费的总时间。The total time taken by the bulk update API call to complete the execution.

性能提示Performance tips

使用批量执行程序库时,请注意以下几点,以获得更好的性能:Consider the following points for better performance when using the bulk executor library:

  • 为获得最佳性能,请从 Azure Cosmos 帐户写入区域中的 Azure 虚拟机运行应用程序。For best performance, run your application from an Azure virtual machine that is in the same region as your Azure Cosmos account's write region.

  • 建议在单个虚拟机中,为整个应用程序实例化对应于特定 Azure Cosmos 容器的单个 BulkExecutor 对象。It is recommended that you instantiate a single BulkExecutor object for the whole application within a single virtual machine that corresponds to a specific Azure Cosmos container.

  • 原因是单个批量操作 API 执行会消耗客户端计算机的大量 CPU 和网络 IO(发生这种情况的原因是在内部生成了多个任务)。Since a single bulk operation API execution consumes a large chunk of the client machine's CPU and network IO (This happens by spawning multiple tasks internally). 在执行批量操作 API 调用的应用程序进程中,请避免生成多个并发任务。Avoid spawning multiple concurrent tasks within your application process that execute bulk operation API calls. 如果单个虚拟机上运行的单个批量操作 API 调用无法占用整个容器的吞吐量(如果容器吞吐量超过 100 万 RU/秒),最好是创建独立的虚拟机来并发执行批量操作 API 调用。If a single bulk operation API call that is running on a single virtual machine is unable to consume the entire container's throughput (if your container's throughput > 1 million RU/s), it's preferred to create separate virtual machines to concurrently execute the bulk operation API calls.

  • 确保在实例化 BulkExecutor 对象之后调用 InitializeAsync() 方法,以提取目标 Cosmos 容器的分区映射。Ensure the InitializeAsync() method is invoked after instantiating a BulkExecutor object to fetch the target Cosmos container's partition map.

  • 在应用程序的 App.Config 中,确保启用 gcServer 以获得更好的性能In your application's App.Config, ensure gcServer is enabled for better performance

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • 库会发出跟踪,可将其收集到日志文件或控制台。The library emits traces that can be collected either into a log file or on the console. 若要启用上述两项功能,请将以下代码添加到应用程序的 App.Config 文件。To enable both, add the following code to your application's App.Config file.

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

后续步骤Next steps