本指南中使用 Azure SDK for JavaScript 的 executeBulkOperations
功能在 Azure Cosmos DB for NoSQL 中执行批量创建和更新操作。 此功能可调整每个分区的并发处理,自动处理每项操作的重试逻辑,并执行大量操作(100+)。 此功能通过在调用成功且不受限制时增加并发性并在发生限制时缩减来自动动态调整并发性。
一个现有的 Azure Cosmos DB for NoSQL 帐户
- 如果没有现有帐户, 请创建新帐户。
Node.js 22.x 或更高版本
首先,使用初始项目和所有开发人员依赖项创建开发环境。 对于此项目,你将把来自@azure/cosmos
的 安装为开发依赖项。
在空文件夹中打开终端。
初始化新的 Node.js 项目。
npm init
使用此内容更新新生成的
package.json
文件。{ "main": "app.js", "scripts": { "start": "node app.js" } }
从节点包管理器 (npm) 安装
@azure/cosmos
包。npm install --save @azure/cosmos
重要
批量执行功能在 Azure SDK for JavaScript 中的库版本 4.3 或更高版本
@azure/cosmos
中可用。 如果使用的是旧版 SDK,则必须迁移到较新版本才能使用这些功能。
现在,使用已安装的包配置与现有 Azure Cosmos DB for NoSQL 帐户的连接。
导入
CosmosClient
类型和BulkOperationType
及PatchOperationType
支持类型。const { BulkOperationType, PatchOperationType, CosmosClient } = require('@azure/cosmos');
创建
CosmosClient
类的新实例,其传入帐户的相应凭据。const client = new CosmosClient({ endpoint: '<azure-cosmos-db-nosql-account-endpoint>', credential });
为指向现有数据库和容器资源的指针创建常量。
const database = client.database('<database-name>'); const container = database.container('<container-name>');
首先执行一个批量操作来更新插入两项。 这两个项目都由项数组 OperationInput
组成。
Upsert
和 Create
操作必须至少包括唯一标识符(id
)和 分区键 字段。
备注
在此示例中,分区键是 /category
字段。
为要更新插入容器的项创建两个常量。
const yambdaSurfboard = { id: 'aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb', category: 'gear-surf-surfboards', name: 'Yamba Surfboard', quantity: 12, price: 850.0, clearance: false }; const kiamaClassicSurfboard = { id: 'bbbbbbbb-1111-2222-3333-cccccccccccc', category: 'gear-surf-surfboards', name: 'Kiama Classic Surfboard', quantity: 25, price: 790.0, clearance: true };
使用
Upsert
对两个项目创建操作数组。 在partitionKey
属性中显式指定resourceBody
属性和项目本身。const upsertOperations = [ { operationType: BulkOperationType.Upsert, partitionKey: 'gear-surf-surfboards', resourceBody: yambdaSurfboard }, { operationType: BulkOperationType.Upsert, partitionKey: 'gear-surf-surfboards', resourceBody: kiamaClassicSurfboard } ];
运行并查看批量操作的响应。 响应包含有关每个操作的元数据。
使用您的容器的
executeBulkOperations
属性的items
方法批量执行操作。const response = await container.items.executeBulkOperations(upsertOperations); console.log(response);
分析响应对象。 响应包含成功操作的状态代码和
response
对象。 失败的操作包含error
对象。npm run start
备注
若要最小化响应有效负载的大小,请设置为
contentResponseOnWriteEnabled
false
. 此标志特定于 SDK 中的批量功能和批处理功能。[ { operationInput: { operationType: 'Upsert', id: 'aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb', partitionKey: 'gear-surf-surfboards', resourceBody: { ... } }, response: { statusCode: 201, eTag: '...', activityId: '...', requestCharge: 7.23, resourceBody: { ... }, diagnostics: { ... }, headers: { ... } } }, ... ]
(可选)可以执行其他操作,包括Read
、Delete
和Patch
。 本节中使用的所有操作都必须包括分区键字段。
提示
Create
和Upsert
操作都需要id
和分区键字段。 其余作业Read
、Delete
、Replace
和Patch
只需要分区键字段。 如果未指定必填字段,作将失败。
const variousOperations = [
{
operationType: BulkOperationType.Read,
id: 'bbbbbbbb-1111-2222-3333-cccccccccccc',
partitionKey: 'gear-surf-surfboards',
},
{
operationType: BulkOperationType.Delete,
id: 'bbbbbbbb-1111-2222-3333-cccccccccccc',
partitionKey: 'gear-surf-surfboards',
},
{
operationType: BulkOperationType.Patch,
id: 'aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb',
partitionKey: 'gear-surf-surfboards',
resourceBody: {
operations: [
{
op: PatchOperationType.add,
path: '/onSale',
value: true
}
],
},
}
]
const response = await container.items.executeBulkOperations(upsertOperations);