共用方式為

从 CouchBase 迁移到 Azure Cosmos DB for NoSQL

Azure Cosmos DB 是可缩放的多区域分布式完全托管型数据库。 它保证您可以以低延迟访问您的数据。 若要详细了解 Azure Cosmos DB,请参阅概述一文。 本文介绍如何将连接到 Couchbase 的 Java 应用程序迁移到 Azure Cosmos DB 中 NoSQL 帐户的 API。

命名法的差异

以下是与 Couchbase 相比,Azure Cosmos DB 中的工作方式不同的主要功能:

Couchbase Azure Cosmos DB
Couchbase 服务器 帐户
数据库
容器/集合
JSON 文档 项目/文档

主要差异

  • Azure Cosmos DB 在文档中具有“ID”字段,而 Couchbase 将 ID 作为存储桶的一部分。 “ID”字段在整个分区中是唯一的。

  • Azure Cosmos DB 使用分区或分片技术进行缩放。 这意味着将数据拆分为多个分片/分区。 这些分区/分片是根据提供的分区键属性创建的。 您可以选择分区键来分别优化读取和写入操作,或者同时优化读取/写入操作。 若要了解详细信息,请参阅 分区 文章。

  • 在 Azure Cosmos DB 中,顶级层次结构不需要表示集合,因为集合名称已存在。 此功能使 JSON 结构更简单。 以下示例显示了 Couchbase 和 Azure Cosmos DB 之间的数据模型差异:

    Couchbase:文档 ID = “99FF4444”

    {
      "TravelDocument":
      {
        "Country":"India",
        "Validity" : "2022-09-01",
        "Person":
        {
          "Name": "Manish",
          "Address": "AB Road, City-z"
        },
        "Visas":
        [
          {
          "Country":"India",
          "Type":"Multi-Entry",
          "Validity":"2022-09-01"
          },
          {
          "Country":"US",
          "Type":"Single-Entry",
          "Validity":"2022-08-01"
          }
        ]
      }
    }
    

    Azure Cosmos DB:请参阅文档中的“ID”,如图所示

    {
      "id" : "99FF4444",
    
      "Country":"India",
       "Validity" : "2022-09-01",
        "Person":
        {
          "Name": "Manish",
          "Address": "AB Road, City-z"
        },
        "Visas":
        [
          {
          "Country":"India",
          "Type":"Multi-Entry",
          "Validity":"2022-09-01"
          },
          {
          "Country":"US",
          "Type":"Single-Entry",
          "Validity":"2022-08-01"
          }
        ]
      }
    

Java SDK 支持

Azure Cosmos DB 具有以下软件开发工具包(SDK),以支持不同的 Java 框架:

  • 异步 SDK
  • Spring Boot SDK

以下各节介绍了何时使用这些 SDK。 假设有三种类型的工作负荷:

Couchbase 作为文档存储库和基于 Spring Data 的自定义查询

如果要迁移的工作负荷基于 Spring Boot 的 SDK,则可以使用以下步骤:

  1. 将父项目添加到 POM.xml 文件中:

    <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>2.1.5.RELEASE</version>
       <relativePath/>
    </parent>
    
  2. 将属性添加到 POM.xml 文件:

    <azure.version>2.1.6</azure.version>
    
  3. 将依赖项添加到 POM.xml 文件:

    <dependency>
        <groupId>com.microsoft.azure</groupId>
        <artifactId>azure-cosmosdb-spring-boot-starter</artifactId>
        <version>2.1.6</version>
    </dependency>
    
  4. 在资源下添加应用程序属性并指定以下内容。 请确保替换 URL、密钥和数据库名称参数:

       azure.cosmosdb.uri=<your-cosmosDB-URL>
       azure.cosmosdb.key=<your-cosmosDB-key>
       azure.cosmosdb.database=<your-cosmosDB-dbName>
    
  5. 定义模型中集合的名称。 还可以指定进一步的批注。 例如,ID、分区键,用于显式表示它们:

    @Document(collection = "mycollection")
        public class User {
            @id
            private String id;
            private String firstName;
            @PartitionKey
            private String lastName;
        }
    

下面是 CRUD 操作的代码片段:

插入和更新操作

其中 _repo 是存储库的对象, 文档 是 POJO 类的对象。 您可以使用 .save 插入或更新(如果找到具有指定 ID 的文档,则更新)。 以下代码片段演示如何插入或更新文档对象:

_repo.save(doc);

删除操作

请考虑以下代码片段,其中 doc 对象必须具有 ID 和分区键才能查找和删除该对象:

_repo.delete(doc);

读取操作

可以使用或不指定分区键来读取文档。 如果未指定分区键,则会将其视为跨分区查询。 请考虑以下代码示例,第一个示例将使用 ID 和分区键字段执行操作。 第二个示例使用常规字段,而不指定分区键字段。

  • _repo.findByIdAndName(objDoc.getId(),objDoc.getName());
  • _repo.findAllByStatus(objDoc.getStatus());

就是这样,现在可以将应用程序与 Azure Cosmos DB 配合使用。 CouchbaseToCosmosDB-SpringCosmos GitHub 存储库中提供了本文档中所述示例的完整代码示例。

将 Couchbase 用作文档存储库并使用 N1QL 查询

N1QL 查询是在 Couchbase 中定义查询的方法。

N1QL 查询 Azure Cosmos DB 查询
SELECT META(TravelDocument.id AS id, TravelDocument.* FROM TravelDocument WHERE _type = “com.xx.xx.xx.xxx.xxx.xxxx” and country = 'India' and ANY m in Visas SATISFIES m.type == 'Multi-Entry' and m.Country IN ['印度', 不丹'] ORDER BY Validity DESC LIMIT 25 OFFSET 0 SELECT c.id,c FROM c JOIN m in c.country='India' WHERE c._type = 'com.xx.xx.xx.xxx.xxxx' and c.country = 'India' and m.type = 'Multi-Entry' and m.Country IN ('印度', 'Bhutan') ORDER BY c.Validity DESC OFFSET 0 LIMIT 25

可以在 N1QL 查询中注意到以下更改:

  • 无需使用 META 关键字或引用第一级文档。 相反,您可以创建您自己的容器引用。 在此示例中,我们将其视为“c”(可以是任何内容)。 此引用用作所有第一级字段的前缀。 例如,c.id、c.country 等。

  • 现在,您可以在子文档上执行联接,并使用专用别名(例如“m”)来引用它,而不是使用“ANY”。 为子文档创建别名后,需要使用别名。 例如,m.Country。

  • OFFSET 序列在 Azure Cosmos DB 查询中有所不同,首先需要指定 OFFSET,然后再指定 LIMIT。 如果使用自定义定义的最大查询,建议不要使用 Spring Data SDK,因为将查询传递给 Azure Cosmos DB 时,客户端可能会产生不必要的开销。 相反,我们有一个直接的异步 Java SDK,在本例中可以有效地利用它。

读取操作

通过以下步骤使用异步 Java SDK:

  1. 将以下依赖项配置为 POM.xml 文件:

    <!-- https://mvnrepository.com/artifact/com.microsoft.azure/azure-cosmosdb -->
    <dependency>
        <groupId>com.microsoft.azure</groupId>
        <artifactId>azure-cosmos</artifactId>
        <version>3.0.0</version>
    </dependency>
    
  2. 使用 ConnectionBuilder 此方法为 Azure Cosmos DB 创建连接对象,如以下示例所示。 请确保将此声明放入 bean 中,以便以下代码应仅执行一次:

    ConnectionPolicy cp=new ConnectionPolicy();
    cp.connectionMode(ConnectionMode.DIRECT);
    
    if(client==null)
       client= CosmosClient.builder()
          .endpoint(Host)//(Host, PrimaryKey, dbName, collName).Builder()
           .connectionPolicy(cp)
           .key(PrimaryKey)
           .consistencyLevel(ConsistencyLevel.EVENTUAL)
           .build();
    
    container = client.getDatabase(_dbName).getContainer(_collName);
    
  3. 若要执行查询,需要运行以下代码片段:

    Flux<FeedResponse<CosmosItemProperties>> objFlux= container.queryItems(query, fo);
    

现在,借助上述方法,可以传递多个查询并执行,而无需任何麻烦。 如果要求执行一个大型查询,可以拆分为多个查询,然后尝试以下代码片段,而不是前面的代码片段:

for(SqlQuerySpec query:queries)
{
   objFlux= container.queryItems(query, fo);
   objFlux .publishOn(Schedulers.elastic())
         .subscribe(feedResponse->
            {
               if(feedResponse.results().size()>0)
               {
                  _docs.addAll(feedResponse.results());
               }
            
            },
            Throwable::printStackTrace,latch::countDown);
   lstFlux.add(objFlux);
}
                  
      Flux.merge(lstFlux);
      latch.await();
}

使用前面的代码,可以并行运行查询,并增加分布式执行以优化。 此外,还可以运行插入和更新操作:

插入操作

若要插入文档,请运行以下代码:

Mono<CosmosItemResponse> objMono= container.createItem(doc,ro);

然后按照如下步骤订阅 Mono:

CountDownLatch latch=new CountDownLatch(1);
objMono .subscribeOn(Schedulers.elastic())
        .subscribe(resourceResponse->
        {
           if(resourceResponse.statusCode()!=successStatus)
              {
                 throw new RuntimeException(resourceResponse.toString());
              }
           },
        Throwable::printStackTrace,latch::countDown);
latch.await();

更新插入操作

Upsert操作要求您指定需要更新的文档。 为了获取完整文档,您可以使用在"读取操作"标题下提到的代码片段,然后修改所需的字段。 以下代码片段向上插入文档:

Mono<CosmosItemResponse> obs= container.upsertItem(doc, ro);

然后订阅 mono。 请参阅插入操作中的 mono 订阅代码段。

删除操作

以下代码片段将执行删除操作:

CosmosItem objItem= container.getItem(doc.Id, doc.Tenant);
Mono<CosmosItemResponse> objMono = objItem.delete(ro);

然后订阅 mono,在插入操作中参考 mono 订阅代码片段。 CouchbaseToCosmosDB-AsyncInSpring GitHub 存储库中提供了完整的代码示例。

Couchbase 作为键/值对

这是一种简单的工作负荷类型,可在其中执行查找,而不是查询。 使用以下步骤来处理键/值对:

  1. 请考虑将“/ID”作为主键,这将确保可以直接在特定分区中执行查找作。 创建集合并将“/ID”指定为分区键。

  2. 完全关闭索引功能。 由于将执行查找操作,因此承载索引开销没有意义。 若要关闭索引,请登录到 Azure 门户,转到 Azure Cosmos DB 帐户。 打开 数据资源管理器,选择 数据库容器。 打开 “缩放和设置” 选项卡,然后选择 “索引策略”。 当前索引策略如下所示:

    {
     "indexingMode": "consistent",
     "automatic": true,
     "includedPaths": [
         {
             "path": "/*"
         }
     ],
     "excludedPaths": [
         {
             "path": "/\"_etag\"/?"
         }
     ]
     }
    

    将上述索引策略替换为以下策略:

    {
     "indexingMode": "none",
     "automatic": false,
     "includedPaths": [],
     "excludedPaths": []
     }
    
  3. 使用以下代码片段创建连接对象。 连接对象(要放入 @Bean 或使其为静态):

    ConnectionPolicy cp=new ConnectionPolicy();
    cp.connectionMode(ConnectionMode.DIRECT);
    
    if(client==null)
       client= CosmosClient.builder()
          .endpoint(Host)//(Host, PrimaryKey, dbName, collName).Builder()
           .connectionPolicy(cp)
           .key(PrimaryKey)
           .consistencyLevel(ConsistencyLevel.EVENTUAL)
           .build();
    
    container = client.getDatabase(_dbName).getContainer(_collName);
    

现在可以执行 CRUD 操作,如下所示:

读取操作

若要读取该项,请使用以下代码片段:

CosmosItemRequestOptions ro=new CosmosItemRequestOptions();
ro.partitionKey(new PartitionKey(documentId));
CountDownLatch latch=new CountDownLatch(1);
      
var objCosmosItem= container.getItem(documentId, documentId);
Mono<CosmosItemResponse> objMono = objCosmosItem.read(ro);
objMono .subscribeOn(Schedulers.elastic())
        .subscribe(resourceResponse->
        {
           if(resourceResponse.item()!=null)
           {
              doc= resourceResponse.properties().toObject(UserModel.class);
           }
        },
        Throwable::printStackTrace,latch::countDown);
latch.await();

插入操作

若要插入项,可以执行以下代码:

Mono<CosmosItemResponse> objMono= container.createItem(doc,ro);

然后订阅 mono,如下所示:

CountDownLatch latch=new CountDownLatch(1);
objMono.subscribeOn(Schedulers.elastic())
      .subscribe(resourceResponse->
      {
         if(resourceResponse.statusCode()!=successStatus)
            {
               throw new RuntimeException(resourceResponse.toString());
            }
         },
      Throwable::printStackTrace,latch::countDown);
latch.await();

更新插入操作

若要更新项的值,请参阅以下代码片段:

Mono<CosmosItemResponse> obs= container.upsertItem(doc, ro);

然后订阅 mono,在插入操作中参考 mono 订阅代码片段。

删除操作

使用以下代码片段执行删除作:

CosmosItem objItem= container.getItem(id, id);
Mono<CosmosItemResponse> objMono = objItem.delete(ro);

然后订阅 mono,在插入操作中参考 mono 订阅代码片段。 CouchbaseToCosmosDB-AsyncKeyValue GitHub 存储库中提供了完整的代码示例。

数据迁移

使用 Azure 数据工厂 迁移数据。 这是迁移数据的最推荐方法。 将源配置为 Couchbase,将接收器配置为 Azure Cosmos DB for NoSQL,请参阅 Azure Azure Cosmos DB 数据工厂连接器 文章,了解详细步骤。

后续步骤