适用范围: NoSQL
Azure Cosmos DB 是可缩放的多区域分布式完全托管型数据库。 它提供了对数据的低延迟访问保证。
本文介绍如何使用最少的代码更改将 .NET 应用程序从 Amazon DynamoDB 迁移到 Azure Cosmos DB。 若要详细了解 Azure Cosmos DB,请参阅概述一文。
下表列出了 Azure Cosmos DB 和 DynamoDB 之间的主要概念差异:
DynamoDB | Azure Cosmos DB(Azure 宇宙数据库) |
---|---|
不适用 | 数据库 |
表 | 集合 |
项 | 文档 |
属性 | 字段 |
辅助索引 | 辅助索引 |
主键 > 分区键 | 分区键 |
主键 > 排序键 | 不是必需 |
溪流 | 更改源 |
写入计算单位 | 请求单位(灵活,可用于读取或写入) |
读取计算单位 | 请求单位(灵活,可用于读取或写入) |
全局表 | 不需要。 预配 Azure Cosmos DB 帐户时,可以直接选择区域。 (稍后可以更改区域。 |
Azure Cosmos DB 的 JSON 结构比 DynamoDB 的 JSON 结构更简单。 以下示例显示了差异。
以下 JSON 对象表示 DynamoDB 中的数据格式:
{
TableName: "Music",
KeySchema: [
{
AttributeName: "Artist",
KeyType: "HASH", //Partition key
},
{
AttributeName: "SongTitle",
KeyType: "RANGE" //Sort key
}
],
AttributeDefinitions: [
{
AttributeName: "Artist",
AttributeType: "S"
},
{
AttributeName: "SongTitle",
AttributeType: "S"
}
],
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 1
}
}
以下 JSON 对象表示 Azure Cosmos DB 中的数据格式:
{
"Artist": "",
"SongTitle": "",
"AlbumTitle": "",
"Year": 9999,
"Price": 0.0,
"Genre": "",
"Tags": ""
}
本文的范围是将应用程序的代码迁移到 Azure Cosmos DB,这是数据库迁移的关键方面。 为了帮助你了解迁移过程的工作原理,以下部分比较 Amazon DynamoDB 和 Azure Cosmos DB 之间的代码。
要下载源代码,请克隆以下存储库:
git clone https://github.com/Azure-Samples/DynamoDB-to-CosmosDB
- .NET Framework 4.7.2
- 最新版本的 Visual Studio,包含 Azure 开发工作负载。 开始时,可以先使用免费的 Visual Studio Community IDE。 在安装 Visual Studio 的过程中,请启用“Azure 开发”工作负载。
- 访问 Azure Cosmos DB for NoSQL 帐户。
- Amazon DynamoDB 的本地安装。
- Java 8.
- Amazon DynamoDB 的可下载版本。 在端口 8000 上运行它。 (可以更改和配置代码。
将以下 NuGet 包添加到项目:
Install-Package Microsoft.Azure.Cosmos
在 Amazon DynamoDB 中,使用以下代码进行连接:
AmazonDynamoDBConfig addbConfig = new AmazonDynamoDBConfig() ;
addbConfig.ServiceURL = "endpoint";
try { aws_dynamodbclient = new AmazonDynamoDBClient( addbConfig ); }
若要连接 Azure Cosmos DB,请将代码更新为:
client_documentDB = new CosmosClient(
"<nosql-account-endpoint>",
tokenCredential
);
借助 Azure Cosmos DB,可使用以下代码来优化连接:
ConnectionMode
:使用直接连接模式连接到 Azure Cosmos DB 服务中的数据节点。 使用网关模式仅初始化和缓存逻辑地址,并在更新时进行刷新。 有关详细信息,请参阅 Azure Cosmos DB SQL SDK 连接模式。ApplicationRegion
:使用此选项可设置首选异地复制区域,以便与 Azure Cosmos DB 交互。 有关详细信息,请参阅 使用 Azure Cosmos DB 在多个区域分发数据。ConsistencyLevel
:使用此选项替代默认一致性级别。 有关详细信息,请参阅 Azure Cosmos DB 中的一致性级别。BulkExecutionMode
:使用此选项通过设置AllowBulkExecution
属性为true
来执行批量操作。 有关详细信息,请参阅 使用 .NET SDK 将数据批量导入到 Azure Cosmos DB for NoSQL 帐户。client_cosmosDB = new CosmosClient(" Your connection string ",new CosmosClientOptions() { ConnectionMode=ConnectionMode.Direct, ApplicationRegion=Regions.EastUS2, ConsistencyLevel=ConsistencyLevel.Session, AllowBulkExecution=true });
若要将数据存储在 Amazon DynamoDB 中,需要先创建表。 定义架构、键类型和属性,如以下代码所示:
// movies_key_schema
public static List<KeySchemaElement> movies_key_schema
= new List<KeySchemaElement>
{
new KeySchemaElement
{
AttributeName = partition_key_name,
KeyType = "HASH"
},
new KeySchemaElement
{
AttributeName = sort_key_name,
KeyType = "RANGE"
}
};
// key names for the Movies table
public const string partition_key_name = "year";
public const string sort_key_name = "title";
public const int readUnits=1, writeUnits=1;
// movie_items_attributes
public static List<AttributeDefinition> movie_items_attributes
= new List<AttributeDefinition>
{
new AttributeDefinition
{
AttributeName = partition_key_name,
AttributeType = "N"
},
new AttributeDefinition
{
AttributeName = sort_key_name,
AttributeType = "S"
}
CreateTableRequest request;
CreateTableResponse response;
// Build the 'CreateTableRequest' structure for the new table
request = new CreateTableRequest
{
TableName = table_name,
AttributeDefinitions = table_attributes,
KeySchema = table_key_schema,
// Provisioned-throughput settings are always required,
// although the local test version of DynamoDB ignores them.
ProvisionedThroughput = new ProvisionedThroughput( readUnits, writeUnits );
};
在 Amazon DynamoDB 中,需要预配读取计算单元和写入计算单元。 在 Azure Cosmos DB 中,将吞吐量指定为每秒请求单位(RU/s)。 可以动态地将 RU/s 用于进行任何操作。 数据按顺序组织为数据库、容器,然后是项。 可以在数据库级别、集合级别或同时指定吞吐量。
若要创建数据库:
await client_cosmosDB.CreateDatabaseIfNotExistsAsync(movies_table_name);
创建容器:
await cosmosDatabase.CreateContainerIfNotExistsAsync(new ContainerProperties() { PartitionKeyPath = "/" + partitionKey, Id = new_collection_name }, provisionedThroughput);
以下代码演示了如何在 Amazon DynamoDB 中加载数据。 代码 moviesArray
列出了 JSON 文档,然后需要循环访问 JSON 文档并将其加载到 Amazon DynamoDB 中。
int n = moviesArray.Count;
for( int i = 0, j = 99; i < n; i++ )
{
try
{
string itemJson = moviesArray[i].ToString();
Document doc = Document.FromJson(itemJson);
Task putItem = moviesTable.PutItemAsync(doc);
if( i >= j )
{
j++;
Console.Write( "{0,5:#,##0}, ", j );
if( j % 1000 == 0 )
Console.Write( "\n " );
j += 99;
}
await putItem;
在 Azure Cosmos DB 中,可以选择使用 moviesContainer.CreateItemStreamAsync()
流式传输和写入。 但是,在此示例中,JSON 将反序列化为 MovieModel
该类型,以演示类型转换功能。 该代码是多线程代码,使用 Azure Cosmos DB 中的分布式体系结构加快加载速度。
List<Task> concurrentTasks = new List<Task>();
for (int i = 0, j = 99; i < n; i++)
{
try
{
MovieModel doc= JsonConvert.DeserializeObject<MovieModel>(moviesArray[i].ToString());
doc.Id = Guid.NewGuid().ToString();
concurrentTasks.Add(moviesContainer.CreateItemAsync(doc,new PartitionKey(doc.Year)));
{
j++;
Console.Write("{0,5:#,##0}, ", j);
if (j % 1000 == 0)
Console.Write("\n ");
j += 99;
}
}
catch (Exception ex)
{
Console.WriteLine("\n ERROR: Could not write the movie record #{0:#,##0}, because:\n {1}",
i, ex.Message);
operationFailed = true;
break;
}
}
await Task.WhenAll(concurrentTasks);
在 Amazon DynamoDB 中写入新文档并不安全。 以下示例将 newItem
用作文档类型:
Task<Document> writeNew = moviesTable.PutItemAsync(newItem, token);
await writeNew;
Azure Cosmos DB 通过数据模型提供类型安全性。 此示例使用名为 MovieModel
: 的数据模型:
public class MovieModel
{
[JsonProperty("id")]
public string Id { get; set; }
[JsonProperty("title")]
public string Title{ get; set; }
[JsonProperty("year")]
public int Year { get; set; }
public MovieModel(string title, int year)
{
this.Title = title;
this.Year = year;
}
public MovieModel()
{
}
[JsonProperty("info")]
public MovieInfo MovieInfo { get; set; }
internal string PrintInfo()
{
if(this.MovieInfo!=null)
return string.Format("\nMovie with title:{1}\n Year: {2}, Actors: {3}\n Directors:{4}\n Rating:{5}\n", this.Id, this.Title, this.Year, String.Join(",",this.MovieInfo.Actors), this.MovieInfo, this.MovieInfo.Rating);
else
return string.Format("\nMovie with title:{0}\n Year: {1}\n", this.Title, this.Year);
}
}
在 Azure Cosmos DB 中, newItem
为 MovieModel
:
MovieModel movieModel = new MovieModel()
{
Id = Guid.NewGuid().ToString(),
Title = "The Big New Movie",
Year = 2018,
MovieInfo = new MovieInfo() { Plot = "Nothing happens at all.", Rating = 0 }
};
var writeNew= moviesContainer.CreateItemAsync(movieModel, new Microsoft.Azure.Cosmos.PartitionKey(movieModel.Year));
await writeNew;
若要在 Amazon DynamoDB 中进行读取,需要定义基元:
// Create primitives for the HASH and RANGE portions of the primary key
Primitive hash = new Primitive(year.ToString(), true);
Primitive range = new Primitive(title, false);
Task<Document> readMovie = moviesTable.GetItemAsync(hash, range, token);
movie_record = await readMovie;
使用 Azure Cosmos DB 时,查询是自然的(LINQ):
IQueryable<MovieModel> movieQuery = moviesContainer.GetItemLinqQueryable<MovieModel>(true)
.Where(f => f.Year == year && f.Title == title);
// The query is executed synchronously here, but can also be executed asynchronously via the IDocumentQuery<T> interface
foreach (MovieModel movie in movieQuery)
{
movie_record_cosmosdb = movie;
}
前面的示例中的文档集合类型安全,并提供自然查询选项。
更新 Amazon DynamoDB 中的项目:
updateResponse = await client.UpdateItemAsync( updateRequest );
在 Azure Cosmos DB 中,更新被视为 Upsert
操作(即如果文档不存在,则插入文档)。
await moviesContainer.UpsertItemAsync<MovieModel>(updatedMovieModel);
若要删除 Amazon DynamoDB 中的项,需要再次使用基元:
Primitive hash = new Primitive(year.ToString(), true);
Primitive range = new Primitive(title, false);
DeleteItemOperationConfig deleteConfig = new DeleteItemOperationConfig( );
deleteConfig.ConditionalExpression = condition;
deleteConfig.ReturnValues = ReturnValues.AllOldAttributes;
Task<Document> delItem = table.DeleteItemAsync( hash, range, deleteConfig );
deletedItem = await delItem;
在 Azure Cosmos DB 中,可以获取文档并将其异步删除:
var result= ReadingMovieItem_async_List_CosmosDB("select * from c where c.info.rating>7 AND c.year=2018 AND c.title='The Big New Movie'");
while (result.HasMoreResults)
{
var resultModel = await result.ReadNextAsync();
foreach (var movie in resultModel.ToList<MovieModel>())
{
await moviesContainer.DeleteItemAsync<MovieModel>(movie.Id, new PartitionKey(movie.Year));
}
}
在 Amazon DynamoDB 中,需要 API 函数才能查询数据:
QueryOperationConfig config = new QueryOperationConfig( );
config.Filter = new QueryFilter( );
config.Filter.AddCondition( "year", QueryOperator.Equal, new DynamoDBEntry[ ] { 1992 } );
config.Filter.AddCondition( "title", QueryOperator.Between, new DynamoDBEntry[ ] { "B", "Hzz" } );
config.AttributesToGet = new List<string> { "year", "title", "info" };
config.Select = SelectValues.SpecificAttributes;
search = moviesTable.Query( config );
在 Azure Cosmos DB 中,可以在简单的 SQL 查询中执行投影和筛选:
var result = moviesContainer.GetItemQueryIterator<MovieModel>(
"select c.Year, c.Title, c.info from c where Year=1998 AND (CONTAINS(Title,'B') OR CONTAINS(Title,'Hzz'))");
对于范围操作(例如 between
),您需要在 Amazon DynamoDB 中进行扫描:
ScanRequest sRequest = new ScanRequest
{
TableName = "Movies",
ExpressionAttributeNames = new Dictionary<string, string>
{
{ "#yr", "year" }
},
ExpressionAttributeValues = new Dictionary<string, AttributeValue>
{
{ ":y_a", new AttributeValue { N = "1960" } },
{ ":y_z", new AttributeValue { N = "1969" } },
},
FilterExpression = "#yr between :y_a and :y_z",
ProjectionExpression = "#yr, title, info.actors[0], info.directors, info.running_time_secs"
};
ClientScanning_async( sRequest ).Wait( );
在 Azure Cosmos DB 中,可以使用 SQL 查询和单行语句:
var result = moviesContainer.GetItemQueryIterator<MovieModel>(
"select c.title, c.info.actors[0], c.info.directors,c.info.running_time_secs from c where BETWEEN year 1960 AND 1969");
若要删除 Amazon DynamoDB 中的表,可指定:
client.DeleteTableAsync( tableName );
若要删除 Azure Cosmos DB 中的集合,可指定:
await moviesContainer.DeleteContainerAsync();
然后,如有必要,也删除数据库:
await cosmosDatabase.DeleteAsync();
如前面的示例所示,Azure Cosmos DB 支持 SQL 查询,操作是异步的。 可以轻松地将复杂代码迁移到 Azure Cosmos DB。 迁移后,代码变得更加简单。
- 了解性能优化。
- 了解如何 优化读取和写入。
- 了解 Azure Cosmos DB 中的监视。