使用 Azure SDK for JavaScript 在 Azure Cosmos DB 的 NoSQL 模式中执行批量操作

本指南中使用 Azure SDK for JavaScript 的 executeBulkOperations 功能在 Azure Cosmos DB for NoSQL 中执行批量创建和更新操作。 此功能可调整每个分区的并发处理,自动处理每项操作的重试逻辑,并执行大量操作(100+)。 此功能通过在调用成功且不受限制时增加并发性并在发生限制时缩减来自动动态调整并发性。

先决条件

  • 一个现有的 Azure Cosmos DB for NoSQL 帐户

  • Node.js 22.x 或更高版本

初始化开发环境

首先,使用初始项目和所有开发人员依赖项创建开发环境。 对于此项目,你将把来自@azure/cosmos 安装为开发依赖项。

  1. 在空文件夹中打开终端。

  2. 初始化新的 Node.js 项目。

    npm init
    
  3. 使用此内容更新新生成的 package.json 文件。

    {
      "main": "app.js",
      "scripts": {
        "start": "node app.js"
      }
    }
    
  4. 从节点包管理器 (npm) 安装@azure/cosmos包。

    npm install --save @azure/cosmos
    

    重要

    批量执行功能在 Azure SDK for JavaScript 中的库版本 4.3 或更高版本 @azure/cosmos 中可用。 如果使用的是旧版 SDK,则必须迁移到较新版本才能使用这些功能。

配置连接和资源

现在,使用已安装的包配置与现有 Azure Cosmos DB for NoSQL 帐户的连接。

  1. 导入CosmosClient类型和BulkOperationTypePatchOperationType支持类型。

    const { BulkOperationType, PatchOperationType, CosmosClient } = require('@azure/cosmos');
    
  2. 创建 CosmosClient 类的新实例,其传入帐户的相应凭据。

    const client = new CosmosClient({
        endpoint: '<azure-cosmos-db-nosql-account-endpoint>',
        credential
    });
    
  3. 为指向现有数据库和容器资源的指针创建常量。

    const database = client.database('<database-name>');
    
    const container = database.container('<container-name>');
    

批量创建多更新插入操作

首先执行一个批量操作来更新插入两项。 这两个项目都由项数组 OperationInput 组成。 UpsertCreate 操作必须至少包括唯一标识符(id)和 分区键 字段。

备注

在此示例中,分区键是 /category 字段。

  1. 为要更新插入容器的项创建两个常量。

    const yambdaSurfboard = {
        id: 'aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb',
        category: 'gear-surf-surfboards',
        name: 'Yamba Surfboard',
        quantity: 12,
        price: 850.0,
        clearance: false
    };
    
    const kiamaClassicSurfboard = {
        id: 'bbbbbbbb-1111-2222-3333-cccccccccccc',
        category: 'gear-surf-surfboards',
        name: 'Kiama Classic Surfboard',
        quantity: 25,
        price: 790.0,
        clearance: true
    };
    
  2. 使用 Upsert 对两个项目创建操作数组。 在partitionKey属性中显式指定resourceBody属性和项目本身。

    const upsertOperations = [
        {
            operationType: BulkOperationType.Upsert,
            partitionKey: 'gear-surf-surfboards',
            resourceBody: yambdaSurfboard
        },
        {
            operationType: BulkOperationType.Upsert,
            partitionKey: 'gear-surf-surfboards',
            resourceBody: kiamaClassicSurfboard
        }
    ];
    

分析批量操作响应

运行并查看批量操作的响应。 响应包含有关每个操作的元数据。

  1. 使用您的容器的executeBulkOperations属性的items方法批量执行操作。

    const response = await container.items.executeBulkOperations(upsertOperations);
    
    console.log(response);
    
  2. 分析响应对象。 响应包含成功操作的状态代码和 response 对象。 失败的操作包含 error 对象。

    npm run start
    

    备注

    若要最小化响应有效负载的大小,请设置为 contentResponseOnWriteEnabledfalse. 此标志特定于 SDK 中的批量功能和批处理功能。

    [
      {
        operationInput: {
          operationType: 'Upsert',
          id: 'aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb',
          partitionKey: 'gear-surf-surfboards',
          resourceBody: { ... }
        },
        response: {
          statusCode: 201,
          eTag: '...',
          activityId: '...',
          requestCharge: 7.23,
          resourceBody: { ... },
          diagnostics: { ... },
          headers: { ... }
        }
      },
      ...
    ]
    

执行更多批量操作

(可选)可以执行其他操作,包括ReadDeletePatch。 本节中使用的所有操作都必须包括分区键字段。

提示

CreateUpsert操作都需要id分区键字段。 其余作业ReadDeleteReplacePatch只需要分区键字段。 如果未指定必填字段,作将失败。

const variousOperations = [
    {
        operationType: BulkOperationType.Read,
        id: 'bbbbbbbb-1111-2222-3333-cccccccccccc',
        partitionKey: 'gear-surf-surfboards',
    },
    {
        operationType: BulkOperationType.Delete,
        id: 'bbbbbbbb-1111-2222-3333-cccccccccccc',
        partitionKey: 'gear-surf-surfboards',
    },
    {
        operationType: BulkOperationType.Patch,
        id: 'aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb',
        partitionKey: 'gear-surf-surfboards',
        resourceBody: {
            operations: [
                {
                    op: PatchOperationType.add,
                    path: '/onSale',
                    value: true
                }
            ],
        },
    }
]

const response = await container.items.executeBulkOperations(upsertOperations);