管理 Azure Cosmos DB 中的冲突解决策略
适用范围: NoSQL
对于多区域写入,当多个客户端写入同一项时,可能会发生冲突。 发生冲突时,可以通过使用不同的冲突解决策略来解决冲突。 本文介绍如何管理冲突解决策略。
提示
冲突解决策略只能在容器创建时指定,并且不得在容器创建后修改。
创建“以最后写入者为准”冲突解决策略
这些示例介绍如何使用“以最后写入者为准”冲突解决策略设置一个容器。 “以最后写入者为准”的默认路径是时间戳字段或 _ts
属性。 对于 API for NoSQL,也可以将其设置为数值类型的用户定义路径。 如果发生冲突,最高值优先。 如果路径未设置或无效,则它默认为 _ts
。 使用此策略解决的冲突不会显示在冲突源中。 此策略可供所有 API 使用。
.NET SDK
DocumentCollection lwwCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
{
Id = this.lwwCollectionName,
ConflictResolutionPolicy = new ConflictResolutionPolicy
{
Mode = ConflictResolutionMode.LastWriterWins,
ConflictResolutionPath = "/myCustomId",
},
});
Java V4 SDK
Java SDK V4 (Maven com.azure::azure-cosmos) 异步 API
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");
CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();
Java V2 SDK
Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)
DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();
Node.js/JavaScript/TypeScript SDK
const database = client.database(this.databaseName);
const { container: lwwContainer } = await database.containers.createIfNotExists(
{
id: this.lwwContainerName,
conflictResolutionPolicy: {
mode: "LastWriterWins",
conflictResolutionPath: "/myCustomId"
}
}
);
Python SDK
database = client.get_database_client(database=database_id)
lww_conflict_resolution_policy = {'mode': 'LastWriterWins', 'conflictResolutionPath': '/regionId'}
lww_container = database.create_container(id=lww_container_id, partition_key=PartitionKey(path="/id"),
conflict_resolution_policy=lww_conflict_resolution_policy)
使用存储过程创建自定义冲突解决策略
这些示例介绍如何使用自定义冲突解决策略设置一个容器。 此策略使用存储过程中的逻辑来解决冲突。 如果指定了存储过程来解决冲突,则除非指定的存储过程中出现错误,否则冲突不会显示在冲突源中。
使用容器创建策略后,需要创建存储的过程。 下面的 .NET SDK 示例显示了此工作流的示例。 仅 API for NoSQL 支持此策略。
自定义冲突解决存储过程示例
必须使用下面显示的函数签名实现自定义冲突解决存储过程。 函数名称不需要与使用容器注册存储过程时使用的名称匹配,但它确实可以简化命名。 下面介绍了此存储过程必须实现的参数。
- incomingItem:在生成冲突的提交中插入或更新的项。 对于删除操作为 null。
- existingItem:当前已提交的项。 此值在更新中为非 null,对于插入或删除是 null。
- isTombstone:指示 incomingItem 是否与以前删除的项冲突的布尔值。 如果为 true,existingItem 也为 null。
- conflictingItems:容器中所有项目的已提交版本的数组,与 ID 上的 incomingItem 或唯一索引属性冲突。
重要
与任何存储过程一样,自定义冲突解决过程可以访问具有相同分区键的任何数据,并可以执行任何插入、更新或删除操作来解决冲突。
此存储过程示例通过从 /myCustomId
路径中选择最小值来解决冲突。
function resolver(incomingItem, existingItem, isTombstone, conflictingItems) {
var collection = getContext().getCollection();
if (!incomingItem) {
if (existingItem) {
collection.deleteDocument(existingItem._self, {}, function (err, responseOptions) {
if (err) throw err;
});
}
} else if (isTombstone) {
// delete always wins.
} else {
if (existingItem) {
if (incomingItem.myCustomId > existingItem.myCustomId) {
return; // existing item wins
}
}
var i;
for (i = 0; i < conflictingItems.length; i++) {
if (incomingItem.myCustomId > conflictingItems[i].myCustomId) {
return; // existing conflict item wins
}
}
// incoming item wins - clear conflicts and replace existing with incoming.
tryDelete(conflictingItems, incomingItem, existingItem);
}
function tryDelete(documents, incoming, existing) {
if (documents.length > 0) {
collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
if (err) throw err;
documents.shift();
tryDelete(documents, incoming, existing);
});
} else if (existing) {
collection.replaceDocument(existing._self, incoming,
function (err, documentCreated) {
if (err) throw err;
});
} else {
collection.createDocument(collection.getSelfLink(), incoming,
function (err, documentCreated) {
if (err) throw err;
});
}
}
}
.NET SDK
DocumentCollection udpCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
{
Id = this.udpCollectionName,
ConflictResolutionPolicy = new ConflictResolutionPolicy
{
Mode = ConflictResolutionMode.Custom,
ConflictResolutionProcedure = string.Format("dbs/{0}/colls/{1}/sprocs/{2}", this.databaseName, this.udpCollectionName, "resolver"),
},
});
//Create the stored procedure
await clients[0].CreateStoredProcedureAsync(
UriFactory.CreateStoredProcedureUri(this.databaseName, this.udpCollectionName, "resolver"), new StoredProcedure
{
Id = "resolver",
Body = File.ReadAllText(@"resolver.js")
});
Java V4 SDK
Java SDK V4 (Maven com.azure::azure-cosmos) 异步 API
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");
CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();
Java V2 SDK
Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)
DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();
创建容器后,必须创建 resolver
存储过程。
Node.js/JavaScript/TypeScript SDK
const database = client.database(this.databaseName);
const { container: udpContainer } = await database.containers.createIfNotExists(
{
id: this.udpContainerName,
conflictResolutionPolicy: {
mode: "Custom",
conflictResolutionProcedure: `dbs/${this.databaseName}/colls/${
this.udpContainerName
}/sprocs/resolver`
}
}
);
创建容器后,必须创建 resolver
存储过程。
Python SDK
database = client.get_database_client(database=database_id)
udp_custom_resolution_policy = {'mode': 'Custom' }
udp_container = database.create_container(id=udp_container_id, partition_key=PartitionKey(path="/id"),
conflict_resolution_policy=udp_custom_resolution_policy)
创建容器后,必须创建 resolver
存储过程。
创建自定义冲突解决策略
这些示例介绍如何使用自定义冲突解决策略设置一个容器。 使用此实现,每个冲突都将显示在冲突源中。 由你单独处理冲突源中的冲突。
.NET SDK
DocumentCollection manualCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
{
Id = this.manualCollectionName,
ConflictResolutionPolicy = new ConflictResolutionPolicy
{
Mode = ConflictResolutionMode.Custom,
},
});
Java V4 SDK
Java SDK V4 (Maven com.azure::azure-cosmos) 异步 API
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();
CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();
Java V2 SDK
Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)
DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();
Node.js/JavaScript/TypeScript SDK
const database = client.database(this.databaseName);
const {
container: manualContainer
} = await database.containers.createIfNotExists({
id: this.manualContainerName,
conflictResolutionPolicy: {
mode: "Custom"
}
});
Python SDK
database = client.get_database_client(database=database_id)
manual_resolution_policy = {'mode': 'Custom'}
manual_container = database.create_container(id=manual_container_id, partition_key=PartitionKey(path="/id"),
conflict_resolution_policy=manual_resolution_policy)
从冲突源读取
这些示例介绍如何从容器的冲突源读取。 冲突可能仅出于以下几个原因显示在冲突源中:
- 冲突未自动解决
- 冲突导致指定存储过程出错
- 冲突解决策略设置为“自定义”,并且不指定用于处理冲突的存储过程
.NET SDK
FeedResponse<Conflict> conflicts = await delClient.ReadConflictFeedAsync(this.collectionUri);
Java SDK
Java V4 SDK (Maven com.azure::azure-cosmos)
int requestPageSize = 3;
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
CosmosPagedFlux<CosmosConflictProperties> conflictReadFeedFlux = container.readAllConflicts(options);
conflictReadFeedFlux.byPage(requestPageSize).toIterable().forEach(page -> {
int expectedNumberOfConflicts = 0;
int numberOfResults = 0;
Iterator<CosmosConflictProperties> pageIt = page.getElements().iterator();
while (pageIt.hasNext()) {
CosmosConflictProperties conflictProperties = pageIt.next();
// Read the conflict and committed item
CosmosAsyncConflict conflict = container.getConflict(conflictProperties.getId());
CosmosConflictResponse response = conflict.read(new CosmosConflictRequestOptions()).block();
// response.
}
});
Node.js/JavaScript/TypeScript SDK
const container = client
.database(this.databaseName)
.container(this.lwwContainerName);
const { result: conflicts } = await container.conflicts.readAll().toArray();
Python
conflicts_iterator = iter(container.list_conflicts())
conflict = next(conflicts_iterator, None)
while conflict:
# Do something with conflict
conflict = next(conflicts_iterator, None)
后续步骤
了解以下 Azure Cosmos DB 概念: