管理 Azure Cosmos DB 中的冲突解决策略

适用于 :SQL API

对于多区域写入,当多个客户端写入同一项时,可能会发生冲突。 发生冲突时,可以通过使用不同的冲突解决策略来解决冲突。 本文介绍如何管理冲突解决策略。

提示

冲突解决策略只能在容器创建时指定,并且不得在容器创建后修改。

创建“以最后写入者为准”冲突解决策略

这些示例介绍如何使用“以最后写入者为准”冲突解决策略设置一个容器。 “以最后写入者为准”的默认路径是时间戳字段或 _ts 属性。 对于 SQL API,也可以将其设置为数值类型的用户定义路径。 如果发生冲突,最高值优先。 如果路径未设置或无效,则它默认为 _ts。 使用此策略解决的冲突不会显示在冲突源中。 此策略可供所有 API 使用。

.NET SDK

DocumentCollection lwwCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.lwwCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.LastWriterWins,
          ConflictResolutionPath = "/myCustomId",
      },
  });

Java V4 SDK

Java SDK V4 (Maven com.azure::azure-cosmos) 异步 API


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Java V2 SDK

Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Node.js/JavaScript/TypeScript SDK

const database = client.database(this.databaseName);
const { container: lwwContainer } = await database.containers.createIfNotExists(
  {
    id: this.lwwContainerName,
    conflictResolutionPolicy: {
      mode: "LastWriterWins",
      conflictResolutionPath: "/myCustomId"
    }
  }
);

Python SDK

udp_collection = {
    'id': self.udp_collection_name,
    'conflictResolutionPolicy': {
        'mode': 'LastWriterWins',
        'conflictResolutionPath': '/myCustomId'
    }
}
udp_collection = self.try_create_document_collection(
    create_client, database, udp_collection)

使用存储过程创建自定义冲突解决策略

这些示例介绍如何使用自定义冲突解决策略设置一个容器,通过存储的过程来解决冲突。 这些冲突不会显示在冲突源中,除非存储过程中存在错误。 使用容器创建策略后,需要创建存储的过程。 下面的 .NET SDK 示例演示一个示例。 此策略仅在核心 (SQL) API 上受支持。

自定义冲突解决存储过程示例

必须使用下面显示的函数签名实现自定义冲突解决存储过程。 函数名称不需要与使用容器注册存储过程时使用的名称匹配,但它确实可以简化命名。 下面介绍了此存储过程必须实现的参数。

  • incomingItem:在生成冲突的提交中插入或更新的项。 对于删除操作为 null。
  • existingItem:当前已提交的项。 此值在更新中为非 null,对于插入或删除是 null。
  • isTombstone:指示 incomingItem 是否与以前删除的项冲突的布尔值。 如果为 true,existingItem 也为 null。
  • conflictingItems:容器中所有项目的已提交版本的数组,与 ID 上的 incomingItem 或唯一索引属性冲突。

重要

与任何存储过程一样,自定义冲突解决过程可以访问具有相同分区键的任何数据,并可以执行任何插入、更新或删除操作来解决冲突。

此存储过程示例通过从 /myCustomId 路径中选择最小值来解决冲突。

function resolver(incomingItem, existingItem, isTombstone, conflictingItems) {
  var collection = getContext().getCollection();

  if (!incomingItem) {
      if (existingItem) {

          collection.deleteDocument(existingItem._self, {}, function (err, responseOptions) {
              if (err) throw err;
          });
      }
  } else if (isTombstone) {
      // delete always wins.
  } else {
      if (existingItem) {
          if (incomingItem.myCustomId > existingItem.myCustomId) {
              return; // existing item wins
          }
      }

      var i;
      for (i = 0; i < conflictingItems.length; i++) {
          if (incomingItem.myCustomId > conflictingItems[i].myCustomId) {
              return; // existing conflict item wins
          }
      }

      // incoming item wins - clear conflicts and replace existing with incoming.
      tryDelete(conflictingItems, incomingItem, existingItem);
  }

  function tryDelete(documents, incoming, existing) {
      if (documents.length > 0) {
          collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
              if (err) throw err;

              documents.shift();
              tryDelete(documents, incoming, existing);
          });
      } else if (existing) {
          collection.replaceDocument(existing._self, incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      } else {
          collection.createDocument(collection.getSelfLink(), incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      }
  }
}

.NET SDK

DocumentCollection udpCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.udpCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
          ConflictResolutionProcedure = string.Format("dbs/{0}/colls/{1}/sprocs/{2}", this.databaseName, this.udpCollectionName, "resolver"),
      },
  });

//Create the stored procedure
await clients[0].CreateStoredProcedureAsync(
UriFactory.CreateStoredProcedureUri(this.databaseName, this.udpCollectionName, "resolver"), new StoredProcedure
{
    Id = "resolver",
    Body = File.ReadAllText(@"resolver.js")
});

Java V4 SDK

Java SDK V4 (Maven com.azure::azure-cosmos) 异步 API


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Java V2 SDK

Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

创建容器后,必须创建 resolver 存储过程。

Node.js/JavaScript/TypeScript SDK

const database = client.database(this.databaseName);
const { container: udpContainer } = await database.containers.createIfNotExists(
  {
    id: this.udpContainerName,
    conflictResolutionPolicy: {
      mode: "Custom",
      conflictResolutionProcedure: `dbs/${this.databaseName}/colls/${
        this.udpContainerName
      }/sprocs/resolver`
    }
  }
);

创建容器后,必须创建 resolver 存储过程。

Python SDK

udp_collection = {
    'id': self.udp_collection_name,
    'conflictResolutionPolicy': {
        'mode': 'Custom',
        'conflictResolutionProcedure': 'dbs/' + self.database_name + "/colls/" + self.udp_collection_name + '/sprocs/resolver'
    }
}
udp_collection = self.try_create_document_collection(
    create_client, database, udp_collection)

创建容器后,必须创建 resolver 存储过程。

创建自定义冲突解决策略

这些示例介绍如何使用自定义冲突解决策略设置一个容器。 这些冲突会显示在冲突源中。

.NET SDK

DocumentCollection manualCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.manualCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
      },
  });

Java V4 SDK

Java SDK V4 (Maven com.azure::azure-cosmos) 异步 API


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Java V2 SDK

Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Node.js/JavaScript/TypeScript SDK

const database = client.database(this.databaseName);
const {
  container: manualContainer
} = await database.containers.createIfNotExists({
  id: this.manualContainerName,
  conflictResolutionPolicy: {
    mode: "Custom"
  }
});

Python SDK

database = client.ReadDatabase("dbs/" + self.database_name)
manual_collection = {
    'id': self.manual_collection_name,
    'conflictResolutionPolicy': {
        'mode': 'Custom'
    }
}
manual_collection = client.CreateContainer(database['_self'], collection)

从冲突源读取

这些示例介绍如何从容器的冲突源读取。 只有在未自动解决冲突或使用自定义冲突策略的情况下,冲突才显示在冲突源中。

.NET SDK

FeedResponse<Conflict> conflicts = await delClient.ReadConflictFeedAsync(this.collectionUri);

Java V2 SDK

Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)

FeedResponse<Conflict> response = client.readConflicts(this.manualCollectionUri, null)
                    .first().toBlocking().single();
for (Conflict conflict : response.getResults()) {
    /* Do something with conflict */
}

Node.js/JavaScript/TypeScript SDK

const container = client
  .database(this.databaseName)
  .container(this.lwwContainerName);

const { result: conflicts } = await container.conflicts.readAll().toArray();

Python

conflicts_iterator = iter(client.ReadConflicts(self.manual_collection_link))
conflict = next(conflicts_iterator, None)
while conflict:
    # Do something with conflict
    conflict = next(conflicts_iterator, None)

后续步骤

了解以下 Azure Cosmos DB 概念: