创建事务性批处理操作时,请从容器实例开始并调用 CreateTransactionalBatch:
PartitionKey partitionKey = new PartitionKey("road-bikes");
TransactionalBatch batch = container.CreateTransactionalBatch(partitionKey);
接下来,将多个操作添加到批处理:
Product bike = new (
id: "68719520766",
category: "road-bikes",
name: "Chropen Road Bike"
);
batch.CreateItem<Product>(bike);
Part part = new (
id: "68719519885",
category: "road-bikes",
name: "Tronosuros Tire",
productId: bike.id
);
batch.CreateItem<Part>(part);
最后,对批处理调用 ExecuteAsync:
using TransactionalBatchResponse response = await batch.ExecuteAsync();
收到响应后,检查响应是否成功。 如果响应指示成功,请提取结果:
if (response.IsSuccessStatusCode)
{
TransactionalBatchOperationResult<Product> productResponse;
productResponse = response.GetOperationResultAtIndex<Product>(0);
Product productResult = productResponse.Resource;
TransactionalBatchOperationResult<Part> partResponse;
partResponse = response.GetOperationResultAtIndex<Part>(1);
Part partResult = partResponse.Resource;
}
重要
如果失败,则失败的操作会有其相应错误的状态代码。 其他所有操作都会有 424 状态代码(失败的依赖项)。 如果操作因尝试创建已存在的项而失败,则返回状态代码 409 (冲突)。 可通过状态代码查明导致事务失败的原因。
创建事务批处理操作时,请调用 CosmosBatch.createCosmosBatch:
PartitionKey partitionKey = new PartitionKey("road-bikes");
CosmosBatch batch = CosmosBatch.createCosmosBatch(partitionKey);
接下来,将多个操作添加到批处理:
Product bike = new Product();
bike.setId("68719520766");
bike.setCategory("road-bikes");
bike.setName("Chropen Road Bike");
batch.createItemOperation(bike);
Part part = new Part();
part.setId("68719519885");
part.setCategory("road-bikes");
part.setName("Tronosuros Tire");
part.setProductId(bike.getId());
batch.createItemOperation(part);
最后,使用容器实例通过批处理调用 executeCosmosBatch:
CosmosBatchResponse response = container.executeCosmosBatch(batch);
收到响应后,检查响应是否成功。 如果响应指示成功,请提取结果:
if (response.isSuccessStatusCode())
{
List<CosmosBatchOperationResult> results = response.getResults();
}
重要
如果失败,则失败的操作会有其相应错误的状态代码。 其他所有操作都会有 424 状态代码(失败的依赖项)。 如果操作因尝试创建已存在的项而失败,则返回状态代码 409 (冲突)。 可通过状态代码查明导致事务失败的原因。
获取或创建容器实例:
container = database.create_container_if_not_exists(id="batch_container",
partition_key=PartitionKey(path='/road_bikes'))
在 Python 中,事务性批处理操作看起来与单一操作 API 非常相似,并且是包含(operation_type_string、args_tuple、batch_operation_kwargs_dictionary)的元组。 下面是将用于演示批处理操作功能的示例项:
create_demo_item = {
"id": "68719520766",
"category": "road-bikes",
"name": "Chropen Road Bike"
}
# for demo, assume that this item already exists in the container.
# the item id will be used for read operation in the batch
read_demo_item1 = {
"id": "68719519884",
"category": "road-bikes",
"name": "Tronosuros Tire",
"productId": "68719520766"
}
# for demo, assume that this item already exists in the container.
# the item id will be used for read operation in the batch
read_demo_item2 = {
"id": "68719519886",
"category": "road-bikes",
"name": "Tronosuros Tire",
"productId": "68719520766"
}
# for demo, assume that this item already exists in the container.
# the item id will be used for read operation in the batch
read_demo_item3 = {
"id": "68719519887",
"category": "road-bikes",
"name": "Tronosuros Tire",
"productId": "68719520766"
}
# for demo, we'll upsert the item with id 68719519885
upsert_demo_item = {
"id": "68719519885",
"category": "road-bikes",
"name": "Tronosuros Tire Upserted",
"productId": "68719520768"
}
# for replace demo, we'll replace the read_demo_item2 with this item
replace_demo_item = {
"id": "68719519886",
"category": "road-bikes",
"name": "Tronosuros Tire replaced",
"productId": "68719520769"
}
# for replace with etag match demo, we'll replace the read_demo_item3 with this item
# The use of etags and if-match/if-none-match options allows users to run conditional replace operations
# based on the etag value passed. When using if-match, the request will only succeed if the item's latest etag
# matches the passed in value. For more on optimistic concurrency control, see the link below:
# /cosmos-db/nosql/database-transactions-optimistic-concurrency
replace_demo_item_if_match_operation = {
"id": "68719519887",
"category": "road-bikes",
"name": "Tronosuros Tireh",
"wasReplaced": "Replaced based on etag match"
"productId": "68719520769"
}
准备要添加到批处理的操作:
create_item_operation = ("create", (create_demo_item,), {})
read_item_operation = ("read", ("68719519884",), {})
delete_item_operation = ("delete", ("68719519885",), {})
upsert_item_operation = ("upsert", (upsert_demo_item,), {})
replace_item_operation = ("replace", ("68719519886", replace_demo_item), {})
replace_item_if_match_operation = ("replace",
("68719519887", replace_demo_item_if_match_operation),
{"if_match_etag": container.client_connection.last_response_headers.get("etag")})
将操作添加到批处理:
batch_operations = [
create_item_operation,
read_item_operation,
delete_item_operation,
upsert_item_operation,
replace_item_operation,
replace_item_if_match_operation
]
最后,执行批处理:
try:
# Run that list of operations
batch_results = container.execute_item_batch(batch_operations=batch_operations, partition_key="road_bikes")
# Batch results are returned as a list of item operation results - or raise a CosmosBatchOperationError if
# one of the operations failed within your batch request.
print("\nResults for the batch operations: {}\n".format(batch_results))
except exceptions.CosmosBatchOperationError as e:
error_operation_index = e.error_index
error_operation_response = e.operation_responses[error_operation_index]
error_operation = batch_operations[error_operation_index]
print("\nError operation: {}, error operation response: {}\n".format(error_operation, error_operation_response))
# [END handle_batch_error]
在批处理中使用 patch 操作和 replace_if_match_etag 操作时的注意事项
批处理操作 kwargs 字典存在限制,总共只采用三个不同的键值。 若要在批处理中使用条件修补,可以使用 filter_predicate 键进行修补操作;如果想要将 etag 用于任何操作,也可以使用 if_match_etag/if_none_match_etag 键。
batch_operations = [
("replace", (item_id, item_body), {"if_match_etag": etag}),
("patch", (item_id, operations), {"filter_predicate": filter_predicate, "if_none_match_etag": etag}),
]
如果失败,则失败的操作会有其相应错误的状态代码。 其他所有操作都会有 424 状态代码(失败的依赖项)。 如果操作因尝试创建已存在的项而失败,则返回状态代码 409 (冲突)。 可通过状态代码查明导致事务失败的原因。
事务性批处理操作如何执行
执行事务性批处理时,事务性批处理中的所有操作都会进行分组,序列化为单个有效负载,并作为单个请求发送到 Azure Cosmos DB 服务。
服务接收请求并在事务作用域内执行所有操作,然后使用相同的序列化协议返回响应。 此响应是成功或失败,并为每个操作提供各项操作响应。
SDK 会将响应公开,以便你验证结果并根据需要提取每个内部操作结果。
限制
目前有两个已知限制:
- Azure Cosmos DB 请求大小限制将事务性批处理有效负载的大小限制为不超过 2 MB,最大执行时间为 5 秒。
- 为了确保性能符合预期并满足 SLA,每个事务性批处理的当前限制为 100 个操作。
后续步骤