从容器实例开始,并调用 CreateTransactionalBatch:
PartitionKey partitionKey = new PartitionKey("road-bikes");
TransactionalBatch batch = container.CreateTransactionalBatch(partitionKey);
接下来,将多个操作添加到批处理:
Product bike = new (
    id: "68719520766",
    category: "road-bikes",
    name: "Chropen Road Bike"
);
batch.CreateItem<Product>(bike);
Part part = new (
    id: "68719519885",
    category: "road-bikes",
    name: "Tronosuros Tire",
    productId: bike.id
);
batch.CreateItem<Part>(part);
最后,对批处理调用 ExecuteAsync:
using TransactionalBatchResponse response = await batch.ExecuteAsync();
收到响应后,检查响应是否成功。 如果响应显示成功,请提取结果:
if (response.IsSuccessStatusCode)
{
    TransactionalBatchOperationResult<Product> productResponse;
    productResponse = response.GetOperationResultAtIndex<Product>(0);
    Product productResult = productResponse.Resource;
    TransactionalBatchOperationResult<Part> partResponse;
    partResponse = response.GetOperationResultAtIndex<Part>(1);
    Part partResult = partResponse.Resource;
}
重要
如果失败,失败的作的状态代码为相应的错误。 所有其他作都具有 424 状态代码(失败依赖项)。 如果操作因尝试创建已存在的项而失败,则返回状态代码 409 (冲突)。 通过状态代码可以识别事务失败的原因。
 
调用 CosmosBatch.createCosmosBatch:
PartitionKey partitionKey = new PartitionKey("road-bikes");
CosmosBatch batch = CosmosBatch.createCosmosBatch(partitionKey);
接下来,将多个操作添加到批处理:
Product bike = new Product();
bike.setId("68719520766");
bike.setCategory("road-bikes");
bike.setName("Chropen Road Bike");
batch.createItemOperation(bike);
Part part = new Part();
part.setId("68719519885");
part.setCategory("road-bikes");
part.setName("Tronosuros Tire");
part.setProductId(bike.getId());
batch.createItemOperation(part);
最后,使用容器实例通过批处理调用 executeCosmosBatch:
CosmosBatchResponse response = container.executeCosmosBatch(batch);
收到响应后,检查响应是否成功。 如果响应显示成功,请提取结果:
if (response.isSuccessStatusCode())
{
    List<CosmosBatchOperationResult> results = response.getResults();
}
重要
如果失败,失败的作的状态代码为相应的错误。 所有其他作都具有 424 状态代码(失败依赖项)。 如果操作因尝试创建已存在的项而失败,则返回状态代码 409 (冲突)。 通过状态代码可以识别事务失败的原因。
 
获取或创建容器实例:
container = database.create_container_if_not_exists(id="batch_container",
                                                        partition_key=PartitionKey(path='/category'))
在 Python 中,事务批处理作类似于单一作 API,是包含operation_type_string、args_tuple batch_operation_kwargs_dictionary元组。 以下示例项演示批处理作功能:
create_demo_item = {
    "id": "68719520766",
    "category": "road-bikes",
    "name": "Chropen Road Bike"
}
# for demo, assume that this item already exists in the container. 
# the item id will be used for read operation in the batch
read_demo_item1 = {
    "id": "68719519884",
    "category": "road-bikes",
    "name": "Tronosuros Tire",
    "productId": "68719520766"
}
# for demo, assume that this item already exists in the container. 
# the item id will be used for read operation in the batch
read_demo_item2 = {
    "id": "68719519886",
    "category": "road-bikes",
    "name": "Tronosuros Tire",
    "productId": "68719520766"
}
# for demo, assume that this item already exists in the container. 
# the item id will be used for read operation in the batch
read_demo_item3 = {
    "id": "68719519887",
    "category": "road-bikes",
    "name": "Tronosuros Tire",
    "productId": "68719520766"
}
# for demo, we'll upsert the item with id 68719519885
upsert_demo_item = {
    "id": "68719519885",
    "category": "road-bikes",
    "name": "Tronosuros Tire Upserted",
    "productId": "68719520768"
}
# for replace demo, we'll replace the read_demo_item2 with this item
replace_demo_item = {
    "id": "68719519886",
    "category": "road-bikes",
    "name": "Tronosuros Tire replaced",
    "productId": "68719520769"
}
# for replace with etag match demo, we'll replace the read_demo_item3 with this item
# The use of etags and if-match/if-none-match options allows users to run conditional replace operations
# based on the etag value passed. When using if-match, the request will only succeed if the item's latest etag
# matches the passed in value. For more on optimistic concurrency control, see the link below:
# /cosmos-db/nosql/database-transactions-optimistic-concurrency
replace_demo_item_if_match_operation = {
    "id": "68719519887",
    "category": "road-bikes",
    "name": "Tronosuros Tireh",
    "wasReplaced": "Replaced based on etag match"
    "productId": "68719520769"
}
准备要添加到批处理的操作:
create_item_operation = ("create", (create_demo_item,), {})
read_item_operation = ("read", ("68719519884",), {})
delete_item_operation = ("delete", ("68719519885",), {})
upsert_item_operation = ("upsert", (upsert_demo_item,), {})
replace_item_operation = ("replace", ("68719519886", replace_demo_item), {})
replace_item_if_match_operation = ("replace",
                                       ("68719519887", replace_demo_item_if_match_operation),
                                       {"if_match_etag": container.client_connection.last_response_headers.get("etag")})
将操作添加到批处理:
batch_operations = [
        create_item_operation,
        read_item_operation,
        delete_item_operation,
        upsert_item_operation,
        replace_item_operation,
        replace_item_if_match_operation
    ]
最后,执行批处理:
try:
    # Run that list of operations
    batch_results = container.execute_item_batch(batch_operations=batch_operations, partition_key="road_bikes")
    # Batch results are returned as a list of item operation results - or raise a CosmosBatchOperationError if
    # one of the operations failed within your batch request.
    print("\nResults for the batch operations: {}\n".format(batch_results))
except exceptions.CosmosBatchOperationError as e:
        error_operation_index = e.error_index
        error_operation_response = e.operation_responses[error_operation_index]
        error_operation = batch_operations[error_operation_index]
        print("\nError operation: {}, error operation response: {}\n".format(error_operation, error_operation_response))
    # [END handle_batch_error]
注释
如果在批处理中使用修补作和replace_if_match_etag作:批处理作 kwargs 字典受到限制,并且只采用三个不同的键值。 若要在批处理中使用条件修补,可以使用 filter_predicate 键进行修补操作;如果想要将 etag 用于任何操作,也可以使用 if_match_etag/if_none_match_etag 键。
batch_operations = [
       ("replace", (item_id, item_body), {"if_match_etag": etag}),
       ("patch", (item_id, operations), {"filter_predicate": filter_predicate, "if_none_match_etag": etag}),
   ]
 
重要
如果失败,失败的作的状态代码为相应的错误。 所有其他作都具有 424 状态代码(失败依赖项)。 如果操作因尝试创建已存在的项而失败,则返回状态代码 409 (冲突)。 通过状态代码可以识别事务失败的原因。
 
调用 NewTransactionalBatch。
pk := azcosmos.NewPartitionKeyString("road-bikes")
batch := container.NewTransactionalBatch(pk)
接下来,将多个操作添加到批处理:
type Bike struct {
	ID       string `json:"id"`
	Category string `json:"category"`
	Name     string `json:"name"`
}
bike := Bike{ID: "68719520766", Category: "road-bikes", Name: "Chropen Road Bike"}
bikeItem, _ := json.Marshal(bike)
batch.CreateItem(bikeItem, nil)
type BikePart struct {
	ID        string `json:"id"`
	Category  string `json:"category"`
	Name      string `json:"name"`
	ProductID string `json:"productID"`
}
part := BikePart{ID: "68719519885", Category: "road-bikes", Name: "Tronosuros Tire", ProductID: bike.ID}
bikePartItem, _ := json.Marshal(part)
batch.CreateItem(bikePartItem, nil)
最后,使用容器实例通过批处理调用 ExecuteTransactionalBatch :
resp, _ := container.ExecuteTransactionalBatch(context.Background(), batch, nil)
if resp.Success {
	// execute conditional logic
}
重要
如果失败,失败的作的状态代码为相应的错误。 所有其他作都具有 424 状态代码(失败依赖项)。 如果操作因尝试创建已存在的项而失败,则返回状态代码 409 (冲突)。 通过状态代码可以识别事务失败的原因。