Azure Cosmos DB 是可缩放的多区域分布式完全托管型数据库。 它保证您可以以低延迟访问您的数据。 若要详细了解 Azure Cosmos DB,请参阅概述一文。 本文介绍如何将连接到 Couchbase 的 Java 应用程序迁移到 Azure Cosmos DB 中 NoSQL 帐户的 API。
命名法的差异
以下是与 Couchbase 相比,Azure Cosmos DB 中的工作方式不同的主要功能:
| Couchbase | Azure Cosmos DB |
|---|---|
| Couchbase 服务器 | 帐户 |
| 桶 | 数据库 |
| 桶 | 容器/集合 |
| JSON 文档 | 项目/文档 |
主要差异
Azure Cosmos DB 在文档中具有“ID”字段,而 Couchbase 将 ID 作为存储桶的一部分。 “ID”字段在整个分区中是唯一的。
Azure Cosmos DB 使用分区或分片技术进行缩放。 这意味着将数据拆分为多个分片/分区。 这些分区/分片是根据提供的分区键属性创建的。 您可以选择分区键来分别优化读取和写入操作,或者同时优化读取/写入操作。 若要了解详细信息,请参阅 分区 文章。
在 Azure Cosmos DB 中,顶级层次结构不需要表示集合,因为集合名称已存在。 此功能使 JSON 结构更简单。 以下示例显示了 Couchbase 和 Azure Cosmos DB 之间的数据模型差异:
Couchbase:文档 ID = “99FF4444”
{ "TravelDocument": { "Country":"India", "Validity" : "2022-09-01", "Person": { "Name": "Manish", "Address": "AB Road, City-z" }, "Visas": [ { "Country":"India", "Type":"Multi-Entry", "Validity":"2022-09-01" }, { "Country":"US", "Type":"Single-Entry", "Validity":"2022-08-01" } ] } }Azure Cosmos DB:请参阅文档中的“ID”,如图所示
{ "id" : "99FF4444", "Country":"India", "Validity" : "2022-09-01", "Person": { "Name": "Manish", "Address": "AB Road, City-z" }, "Visas": [ { "Country":"India", "Type":"Multi-Entry", "Validity":"2022-09-01" }, { "Country":"US", "Type":"Single-Entry", "Validity":"2022-08-01" } ] }
Java SDK 支持
Azure Cosmos DB 具有以下软件开发工具包(SDK),以支持不同的 Java 框架:
- 异步 SDK
- Spring Boot SDK
以下各节介绍了何时使用这些 SDK。 假设有三种类型的工作负荷:
Couchbase 作为文档存储库和基于 Spring Data 的自定义查询
如果要迁移的工作负荷基于 Spring Boot 的 SDK,则可以使用以下步骤:
将父项目添加到 POM.xml 文件中:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> </parent>将属性添加到 POM.xml 文件:
<azure.version>2.1.6</azure.version>将依赖项添加到 POM.xml 文件:
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-cosmosdb-spring-boot-starter</artifactId> <version>2.1.6</version> </dependency>在资源下添加应用程序属性并指定以下内容。 请确保替换 URL、密钥和数据库名称参数:
azure.cosmosdb.uri=<your-cosmosDB-URL> azure.cosmosdb.key=<your-cosmosDB-key> azure.cosmosdb.database=<your-cosmosDB-dbName>定义模型中集合的名称。 还可以指定进一步的批注。 例如,ID、分区键,用于显式表示它们:
@Document(collection = "mycollection") public class User { @id private String id; private String firstName; @PartitionKey private String lastName; }
下面是 CRUD 操作的代码片段:
插入和更新操作
其中 _repo 是存储库的对象, 文档 是 POJO 类的对象。 您可以使用 .save 插入或更新(如果找到具有指定 ID 的文档,则更新)。 以下代码片段演示如何插入或更新文档对象:
_repo.save(doc);
删除操作
请考虑以下代码片段,其中 doc 对象必须具有 ID 和分区键才能查找和删除该对象:
_repo.delete(doc);
读取操作
可以使用或不指定分区键来读取文档。 如果未指定分区键,则会将其视为跨分区查询。 请考虑以下代码示例,第一个示例将使用 ID 和分区键字段执行操作。 第二个示例使用常规字段,而不指定分区键字段。
_repo.findByIdAndName(objDoc.getId(),objDoc.getName());_repo.findAllByStatus(objDoc.getStatus());
就是这样,现在可以将应用程序与 Azure Cosmos DB 配合使用。 CouchbaseToCosmosDB-SpringCosmos GitHub 存储库中提供了本文档中所述示例的完整代码示例。
将 Couchbase 用作文档存储库并使用 N1QL 查询
N1QL 查询是在 Couchbase 中定义查询的方法。
| N1QL 查询 | Azure Cosmos DB 查询 |
|---|---|
SELECT META(TravelDocument.id AS id, TravelDocument.* FROM TravelDocument WHERE _type = “com.xx.xx.xx.xxx.xxx.xxxx” and country = 'India' and ANY m in Visas SATISFIES m.type == 'Multi-Entry' and m.Country IN ['印度', 不丹'] ORDER BY Validity DESC LIMIT 25 OFFSET 0 |
SELECT c.id,c FROM c JOIN m in c.country='India' WHERE c._type = 'com.xx.xx.xx.xxx.xxxx' and c.country = 'India' and m.type = 'Multi-Entry' and m.Country IN ('印度', 'Bhutan') ORDER BY c.Validity DESC OFFSET 0 LIMIT 25 |
可以在 N1QL 查询中注意到以下更改:
无需使用 META 关键字或引用第一级文档。 相反,您可以创建您自己的容器引用。 在此示例中,我们将其视为“c”(可以是任何内容)。 此引用用作所有第一级字段的前缀。 例如,c.id、c.country 等。
现在,您可以在子文档上执行联接,并使用专用别名(例如“m”)来引用它,而不是使用“ANY”。 为子文档创建别名后,需要使用别名。 例如,m.Country。
OFFSET 序列在 Azure Cosmos DB 查询中有所不同,首先需要指定 OFFSET,然后再指定 LIMIT。 如果使用自定义定义的最大查询,建议不要使用 Spring Data SDK,因为将查询传递给 Azure Cosmos DB 时,客户端可能会产生不必要的开销。 相反,我们有一个直接的异步 Java SDK,在本例中可以有效地利用它。
读取操作
通过以下步骤使用异步 Java SDK:
将以下依赖项配置为 POM.xml 文件:
<!-- https://mvnrepository.com/artifact/com.microsoft.azure/azure-cosmosdb --> <dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-cosmos</artifactId> <version>3.0.0</version> </dependency>使用
ConnectionBuilder此方法为 Azure Cosmos DB 创建连接对象,如以下示例所示。 请确保将此声明放入 bean 中,以便以下代码应仅执行一次:ConnectionPolicy cp=new ConnectionPolicy(); cp.connectionMode(ConnectionMode.DIRECT); if(client==null) client= CosmosClient.builder() .endpoint(Host)//(Host, PrimaryKey, dbName, collName).Builder() .connectionPolicy(cp) .key(PrimaryKey) .consistencyLevel(ConsistencyLevel.EVENTUAL) .build(); container = client.getDatabase(_dbName).getContainer(_collName);若要执行查询,需要运行以下代码片段:
Flux<FeedResponse<CosmosItemProperties>> objFlux= container.queryItems(query, fo);
现在,借助上述方法,可以传递多个查询并执行,而无需任何麻烦。 如果要求执行一个大型查询,可以拆分为多个查询,然后尝试以下代码片段,而不是前面的代码片段:
for(SqlQuerySpec query:queries)
{
objFlux= container.queryItems(query, fo);
objFlux .publishOn(Schedulers.elastic())
.subscribe(feedResponse->
{
if(feedResponse.results().size()>0)
{
_docs.addAll(feedResponse.results());
}
},
Throwable::printStackTrace,latch::countDown);
lstFlux.add(objFlux);
}
Flux.merge(lstFlux);
latch.await();
}
使用前面的代码,可以并行运行查询,并增加分布式执行以优化。 此外,还可以运行插入和更新操作:
插入操作
若要插入文档,请运行以下代码:
Mono<CosmosItemResponse> objMono= container.createItem(doc,ro);
然后按照如下步骤订阅 Mono:
CountDownLatch latch=new CountDownLatch(1);
objMono .subscribeOn(Schedulers.elastic())
.subscribe(resourceResponse->
{
if(resourceResponse.statusCode()!=successStatus)
{
throw new RuntimeException(resourceResponse.toString());
}
},
Throwable::printStackTrace,latch::countDown);
latch.await();
更新插入操作
Upsert操作要求您指定需要更新的文档。 为了获取完整文档,您可以使用在"读取操作"标题下提到的代码片段,然后修改所需的字段。 以下代码片段向上插入文档:
Mono<CosmosItemResponse> obs= container.upsertItem(doc, ro);
然后订阅 mono。 请参阅插入操作中的 mono 订阅代码段。
删除操作
以下代码片段将执行删除操作:
CosmosItem objItem= container.getItem(doc.Id, doc.Tenant);
Mono<CosmosItemResponse> objMono = objItem.delete(ro);
然后订阅 mono,在插入操作中参考 mono 订阅代码片段。 CouchbaseToCosmosDB-AsyncInSpring GitHub 存储库中提供了完整的代码示例。
Couchbase 作为键/值对
这是一种简单的工作负荷类型,可在其中执行查找,而不是查询。 使用以下步骤来处理键/值对:
请考虑将“/ID”作为主键,这将确保可以直接在特定分区中执行查找作。 创建集合并将“/ID”指定为分区键。
完全关闭索引功能。 由于将执行查找操作,因此承载索引开销没有意义。 若要关闭索引,请登录到 Azure 门户,转到 Azure Cosmos DB 帐户。 打开 数据资源管理器,选择 数据库 和 容器。 打开 “缩放和设置” 选项卡,然后选择 “索引策略”。 当前索引策略如下所示:
{ "indexingMode": "consistent", "automatic": true, "includedPaths": [ { "path": "/*" } ], "excludedPaths": [ { "path": "/\"_etag\"/?" } ] }将上述索引策略替换为以下策略:
{ "indexingMode": "none", "automatic": false, "includedPaths": [], "excludedPaths": [] }使用以下代码片段创建连接对象。 连接对象(要放入 @Bean 或使其为静态):
ConnectionPolicy cp=new ConnectionPolicy(); cp.connectionMode(ConnectionMode.DIRECT); if(client==null) client= CosmosClient.builder() .endpoint(Host)//(Host, PrimaryKey, dbName, collName).Builder() .connectionPolicy(cp) .key(PrimaryKey) .consistencyLevel(ConsistencyLevel.EVENTUAL) .build(); container = client.getDatabase(_dbName).getContainer(_collName);
现在可以执行 CRUD 操作,如下所示:
读取操作
若要读取该项,请使用以下代码片段:
CosmosItemRequestOptions ro=new CosmosItemRequestOptions();
ro.partitionKey(new PartitionKey(documentId));
CountDownLatch latch=new CountDownLatch(1);
var objCosmosItem= container.getItem(documentId, documentId);
Mono<CosmosItemResponse> objMono = objCosmosItem.read(ro);
objMono .subscribeOn(Schedulers.elastic())
.subscribe(resourceResponse->
{
if(resourceResponse.item()!=null)
{
doc= resourceResponse.properties().toObject(UserModel.class);
}
},
Throwable::printStackTrace,latch::countDown);
latch.await();
插入操作
若要插入项,可以执行以下代码:
Mono<CosmosItemResponse> objMono= container.createItem(doc,ro);
然后订阅 mono,如下所示:
CountDownLatch latch=new CountDownLatch(1);
objMono.subscribeOn(Schedulers.elastic())
.subscribe(resourceResponse->
{
if(resourceResponse.statusCode()!=successStatus)
{
throw new RuntimeException(resourceResponse.toString());
}
},
Throwable::printStackTrace,latch::countDown);
latch.await();
更新插入操作
若要更新项的值,请参阅以下代码片段:
Mono<CosmosItemResponse> obs= container.upsertItem(doc, ro);
然后订阅 mono,在插入操作中参考 mono 订阅代码片段。
删除操作
使用以下代码片段执行删除作:
CosmosItem objItem= container.getItem(id, id);
Mono<CosmosItemResponse> objMono = objItem.delete(ro);
然后订阅 mono,在插入操作中参考 mono 订阅代码片段。 CouchbaseToCosmosDB-AsyncKeyValue GitHub 存储库中提供了完整的代码示例。
数据迁移
使用 Azure 数据工厂 迁移数据。 这是迁移数据的最推荐方法。 将源配置为 Couchbase,将接收器配置为 Azure Cosmos DB for NoSQL,请参阅 Azure Azure Cosmos DB 数据工厂连接器 文章,了解详细步骤。
后续步骤
- 若要优化代码,请参阅 Azure Cosmos DB 性能提示一文。
- 浏览 Java Async V3 SDK,SDK 引用 GitHub 存储库。