本指南中使用 Azure SDK for JavaScript 的 executeBulkOperations 功能在 Azure Cosmos DB for NoSQL 中执行批量创建和更新操作。 此功能可调整每个分区的并发处理,自动处理每项操作的重试逻辑,并执行大量操作(100+)。 此功能通过在调用成功且不受限制时增加并发性并在发生限制时缩减来自动动态调整并发性。
先决条件
一个现有的 Azure Cosmos DB for NoSQL 帐户
- 如果没有现有帐户, 请创建新帐户。
Node.js 22.x 或更高版本
初始化开发环境
首先,使用初始项目和所有开发人员依赖项创建开发环境。 对于此项目,你将把来自@azure/cosmos 的 安装为开发依赖项。
在空文件夹中打开终端。
初始化新的 Node.js 项目。
npm init使用此内容更新新生成的
package.json文件。{ "main": "app.js", "scripts": { "start": "node app.js" } }从节点包管理器 (npm) 安装
@azure/cosmos包。npm install --save @azure/cosmos重要
批量执行功能在 Azure SDK for JavaScript 中的库版本 4.3 或更高版本
@azure/cosmos中可用。 如果使用的是旧版 SDK,则必须迁移到较新版本才能使用这些功能。
配置连接和资源
现在,使用已安装的包配置与现有 Azure Cosmos DB for NoSQL 帐户的连接。
导入
CosmosClient类型和BulkOperationType及PatchOperationType支持类型。const { BulkOperationType, PatchOperationType, CosmosClient } = require('@azure/cosmos');创建
CosmosClient类的新实例,其传入帐户的相应凭据。const client = new CosmosClient({ endpoint: '<azure-cosmos-db-nosql-account-endpoint>', credential });为指向现有数据库和容器资源的指针创建常量。
const database = client.database('<database-name>'); const container = database.container('<container-name>');
批量创建多更新插入操作
首先执行一个批量操作来更新插入两项。 这两个项目都由项数组 OperationInput 组成。
Upsert 和 Create 操作必须至少包括唯一标识符(id)和 分区键 字段。
注释
在此示例中,分区键是 /category 字段。
为要更新插入容器的项创建两个常量。
const yambdaSurfboard = { id: 'aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb', category: 'gear-surf-surfboards', name: 'Yamba Surfboard', quantity: 12, price: 850.0, clearance: false }; const kiamaClassicSurfboard = { id: 'bbbbbbbb-1111-2222-3333-cccccccccccc', category: 'gear-surf-surfboards', name: 'Kiama Classic Surfboard', quantity: 25, price: 790.0, clearance: true };使用
Upsert对两个项目创建操作数组。 在partitionKey属性中显式指定resourceBody属性和项目本身。const upsertOperations = [ { operationType: BulkOperationType.Upsert, partitionKey: 'gear-surf-surfboards', resourceBody: yambdaSurfboard }, { operationType: BulkOperationType.Upsert, partitionKey: 'gear-surf-surfboards', resourceBody: kiamaClassicSurfboard } ];
分析批量操作响应
运行并查看批量操作的响应。 响应包含有关每个操作的元数据。
使用您的容器的
executeBulkOperations属性的items方法批量执行操作。const response = await container.items.executeBulkOperations(upsertOperations); console.log(response);分析响应对象。 响应包含成功操作的状态代码和
response对象。 失败的操作包含error对象。npm run start注释
若要最小化响应有效负载的大小,请设置为
contentResponseOnWriteEnabledfalse. 此标志特定于 SDK 中的批量功能和批处理功能。[ { operationInput: { operationType: 'Upsert', id: 'aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb', partitionKey: 'gear-surf-surfboards', resourceBody: { ... } }, response: { statusCode: 201, eTag: '...', activityId: '...', requestCharge: 7.23, resourceBody: { ... }, diagnostics: { ... }, headers: { ... } } }, ... ]
执行更多批量操作
(可选)可以执行其他操作,包括Read、Delete和Patch。 本节中使用的所有操作都必须包括分区键字段。
小窍门
Create和Upsert操作都需要id和分区键字段。 其余作业Read、Delete、Replace和Patch只需要分区键字段。 如果未指定必填字段,作将失败。
const variousOperations = [
{
operationType: BulkOperationType.Read,
id: 'bbbbbbbb-1111-2222-3333-cccccccccccc',
partitionKey: 'gear-surf-surfboards',
},
{
operationType: BulkOperationType.Delete,
id: 'bbbbbbbb-1111-2222-3333-cccccccccccc',
partitionKey: 'gear-surf-surfboards',
},
{
operationType: BulkOperationType.Patch,
id: 'aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb',
partitionKey: 'gear-surf-surfboards',
resourceBody: {
operations: [
{
op: PatchOperationType.add,
path: '/onSale',
value: true
}
],
},
}
]
const response = await container.items.executeBulkOperations(upsertOperations);