共用方式為

将应用程序从 Amazon DynamoDB 迁移到 Azure Cosmos DB

适用范围: NoSQL

Azure Cosmos DB 是可缩放的多区域分布式完全托管型数据库。 它提供了对数据的低延迟访问保证。

本文介绍如何使用最少的代码更改将 .NET 应用程序从 Amazon DynamoDB 迁移到 Azure Cosmos DB。 若要详细了解 Azure Cosmos DB,请参阅概述一文。

概念差异

下表列出了 Azure Cosmos DB 和 DynamoDB 之间的主要概念差异:

DynamoDB Azure Cosmos DB
不适用 数据库
集合
文档
属性 字段
辅助索引 辅助索引
主键 > 分区键 分区键
主键 > 排序键 不是必需
Stream 更改源
写入计算单位 请求单位(灵活,可用于读取或写入)
读取计算单位 请求单位(灵活,可用于读取或写入)
全局表 不需要。 预配 Azure Cosmos DB 帐户时,可以直接选择区域。 (稍后可以更改区域。

结构差异

Azure Cosmos DB 的 JSON 结构比 DynamoDB 的 JSON 结构更简单。 以下示例显示了差异。

DynamoDB

以下 JSON 对象表示 DynamoDB 中的数据格式:

{
  "TableName": "Music",
  "KeySchema": [
    { 
      "AttributeName": "Artist",
      "KeyType": "HASH",
    },
    { 
      "AttributeName": "SongTitle",
      "KeyType": "RANGE"
    }
    ],
    "AttributeDefinitions": [
    { 
      "AttributeName": "Artist",
      "AttributeType": "S"
    },
    { 
      "AttributeName": "SongTitle",
      "AttributeType": "S"
    }
  ],
  "ProvisionedThroughput": {
    "ReadCapacityUnits": 1,
    "WriteCapacityUnits": 1
  }
}

使用 Artist 分区键和 SongTitle 排序键。

Azure Cosmos DB

以下 JSON 对象表示 Azure Cosmos DB 中的数据格式:

{
  "Artist": "",
  "SongTitle": "",
  "AlbumTitle": "",
  "Year": 9999,
  "Price": 0.0,
  "Genre": "",
  "Tags": ""
}

迁移代码

本文的范围是将应用程序的代码迁移到 Azure Cosmos DB,这是数据库迁移的关键方面。 为了帮助你了解迁移过程的工作原理,以下部分比较 Amazon DynamoDB 和 Azure Cosmos DB 之间的代码。

要下载源代码,请克隆以下存储库:

git clone https://github.com/Azure-Samples/DynamoDB-to-CosmosDB

先决条件

  • .NET Framework 4.7.2
  • 最新版本的 Visual Studio,包含 Azure 开发工作负载。 开始时,可以先使用免费的 Visual Studio Community IDE。 在安装 Visual Studio 的过程中,请启用“Azure 开发”工作负载。
  • 访问 Azure Cosmos DB for NoSQL 帐户。
  • Amazon DynamoDB 的本地安装。
  • Java 8.
  • Amazon DynamoDB 的可下载版本。 在端口 8000 上运行它。 (可以更改和配置代码。

设置代码

将以下 NuGet 包添加到项目:

Install-Package Microsoft.Azure.Cosmos

建立连接

DynamoDB

在 Amazon DynamoDB 中,使用以下代码进行连接:

AmazonDynamoDBConfig addbConfig = new AmazonDynamoDBConfig();
addbConfig.ServiceURL = "endpoint";
try
{
    aws_dynamodbclient = new AmazonDynamoDBClient(addbConfig);
}
catch { }

Azure Cosmos DB

若要连接 Azure Cosmos DB,请将代码更新为:

client_documentDB = new CosmosClient(
    "<nosql-account-endpoint>",
    tokenCredential
);

优化 Azure Cosmos DB 中的连接

借助 Azure Cosmos DB,可使用以下代码来优化连接:

  • ConnectionMode:使用直接连接模式连接到 Azure Cosmos DB 服务中的数据节点。 使用网关模式仅初始化和缓存逻辑地址,并在更新时进行刷新。 有关详细信息,请参阅 Azure Cosmos DB SQL SDK 连接模式

  • ApplicationRegion:使用此选项可设置首选异地复制区域,以便与 Azure Cosmos DB 交互。 有关详细信息,请参阅 使用 Azure Cosmos DB 在多个区域分发数据

  • ConsistencyLevel:使用此选项替代默认一致性级别。 有关详细信息,请参阅 Azure Cosmos DB 中的一致性级别

  • BulkExecutionMode:使用此选项通过设置AllowBulkExecution属性为true来执行批量操作。 有关详细信息,请参阅 使用 .NET SDK 将数据批量导入到 Azure Cosmos DB for NoSQL 帐户

    client_cosmosDB = new CosmosClient("Your connection string", new CosmosClientOptions()
         {
             ConnectionMode = ConnectionMode.Direct,
             ApplicationRegion = Regions.EastUS2,
             ConsistencyLevel = ConsistencyLevel.Session,
             AllowBulkExecution = true,
         });
    

创建容器

DynamoDB

若要将数据存储在 Amazon DynamoDB 中,需要先创建表。 定义架构、键类型和属性,如以下代码所示:

// movies_key_schema
public static List<KeySchemaElement> moviesKeySchema =
    new List<KeySchemaElement>
    {
        new KeySchemaElement
        {
            AttributeName = partitionKeyName,
            KeyType = "HASH"
        },
        new KeySchemaElement
        {
            AttributeName = sortKeyName,
            KeyType = "RANGE"
        },
    };

// key names for the Movies table
public const string partitionKeyName = "year";
public const string sortKeyName = "title";
public const int readUnits = 1, writeUnits = 1;

// movie_items_attributes
public static List<AttributeDefinition> movieItemsAttributes =
    new List<AttributeDefinition>
    {
        new AttributeDefinition
        {
            AttributeName = partitionKeyName,
            AttributeType = "N"
        },
        new AttributeDefinition
        {
            AttributeName = sortKeyName,
            AttributeType = "S"
        }
    };

CreateTableRequest request;
CreateTableResponse response;

// Build the 'CreateTableRequest' structure for the new table
request = new CreateTableRequest
{
    TableName             = tableName,
    AttributeDefinitions  = tableAttributes,
    KeySchema             = tableKeySchema,
    // Provisioned-throughput settings are always required,
    // although the local test version of DynamoDB ignores them.
    ProvisionedThroughput = new ProvisionedThroughput(readUnits, writeUnits)
};

Azure Cosmos DB

在 Amazon DynamoDB 中,需要预配读取计算单元和写入计算单元。 在 Azure Cosmos DB 中,将吞吐量指定为每秒请求单位(RU/s)。 可以动态地将 RU/s 用于进行任何操作。 数据按顺序组织为数据库、容器,然后是项。 可以在数据库级别、集合级别或同时指定吞吐量。

若要创建数据库:

await client_cosmosDB.CreateDatabaseIfNotExistsAsync(movies_table_name);

创建容器:

await cosmosDatabase.CreateContainerIfNotExistsAsync(new ContainerProperties()
    {
        PartitionKeyPath = "/" + partitionKey,
        Id = newCollectionName
    },
    provisionedThroughput);

加载数据

DynamoDB

以下代码演示了如何在 Amazon DynamoDB 中加载数据。 代码 moviesArray 列出了 JSON 文档,然后需要循环访问 JSON 文档并将其加载到 Amazon DynamoDB 中。

int n = moviesArray.Count;
for (int i = 0, j = 99; i < n; i++)
{
    try
    {
        string itemJson = moviesArray[i].ToString();
        Document doc = Document.FromJson(itemJson);
        Task putItem = moviesTable.PutItemAsync(doc);
        if (i >= j)
        {
          j++;
          Console.Write("{0,5:#,##0}, ", j);
          if (j % 1000 == 0)
              Console.Write("\n ");
          j += 99;
        }
        await putItem;
    }
    catch { }
}

Azure Cosmos DB

在 Azure Cosmos DB 中,可以选择使用 moviesContainer.CreateItemStreamAsync()流式传输和写入。 但是,在此示例中,JSON 将反序列化为 MovieModel 该类型,以演示类型转换功能。 该代码是多线程代码,使用 Azure Cosmos DB 中的分布式体系结构加快加载速度。

List<Task> concurrentTasks = new List<Task>();
for (int i = 0, j = 99; i < n; i++)
{
  try
  {
      MovieModel doc = JsonConvert.DeserializeObject<MovieModel>(moviesArray[i].ToString());
      doc.Id = Guid.NewGuid().ToString();
      concurrentTasks.Add(moviesContainer.CreateItemAsync(doc,new PartitionKey(doc.Year)));
      if (i >= j)
      {
          j++;
          Console.Write("{0,5:#,##0}, ", j);
          if (j % 1000 == 0)
              Console.Write("\n               ");
          j += 99;
      }
  }
  catch (Exception ex)
  {
      Console.WriteLine("\n     ERROR: Could not write the movie record #{0:#,##0}, because:\n       {1}",
                          i, ex.Message);
      operationFailed = true;
      break;
  }
}
await Task.WhenAll(concurrentTasks);

创建文档

DynamoDB

在 Amazon DynamoDB 中写入新文档并不安全。 以下示例将 newItem 用作文档类型:

Document writeNew = await moviesTable.PutItemAsync(newItem, token);

Azure Cosmos DB

Azure Cosmos DB 通过数据模型提供类型安全性。 此示例使用名为 MovieModel: 的数据模型:

public class MovieModel
{
    [JsonProperty("id")]
    public string Id { get; set; }
    [JsonProperty("title")]
    public string Title{ get; set; }
    [JsonProperty("year")]
    public int Year { get; set; }
    [JsonProperty("info")]
    public MovieInfo MovieInfo { get; set; }

    public MovieModel(string title, int year)
    {
        this.Title = title;
        this.Year = year;
    }
    public MovieModel() { }

    internal string PrintInfo()
    {
        if (this.MovieInfo != null)
            return string.Format(
                "\nMovie with title:{1}\n Year: {2}, Actors: {3}\n Directors:{4}\n Rating:{5}\n",
                this.Id,
                this.Title,
                this.Year,
                String.Join(",",this.MovieInfo.Actors),
                this.MovieInfo,
                this.MovieInfo.Rating);
        else
            return string.Format(
                "\nMovie with  title:{0}\n Year: {1}\n",
                this.Title,
                this.Year);
    }
}

在 Azure Cosmos DB 中, newItemMovieModel

MovieModel movieModel = new MovieModel
{
    Id = Guid.NewGuid().ToString(),
    Title = "The Big New Movie",
    Year = 2018,
    MovieInfo = new MovieInfo() { Plot = "Nothing happens at all.", Rating = 0 }
};
await moviesContainer.CreateItemAsync(movieModel, new Microsoft.Azure.Cosmos.PartitionKey(movieModel.Year));

读取文档

DynamoDB

若要在 Amazon DynamoDB 中进行读取,需要定义基元:

// Create primitives for the HASH and RANGE portions of the primary key
Primitive hash = new Primitive(year.ToString(), true);
Primitive range = new Primitive(title, false);

Document movieRecord = await moviesTable.GetItemAsync(hash, range, token);

Azure Cosmos DB

使用 Azure Cosmos DB 时,查询是自然的(LINQ):

IQueryable<MovieModel> movieQuery = moviesContainer.GetItemLinqQueryable<MovieModel>(true)
                        .Where(f => f.Year == year && f.Title == title);
// The query is executed synchronously here, but can also be executed asynchronously via the IDocumentQuery<T> interface
foreach (MovieModel movie in movieQuery)
{
    movie_record_cosmosdb = movie;
}

前面的示例中的文档集合类型安全,并提供自然查询选项。

更新项

DynamoDB

更新 Amazon DynamoDB 中的项目:

updateResponse = await client.UpdateItemAsync(updateRequest);

Azure Cosmos DB

在 Azure Cosmos DB 中,更新被视为 Upsert 操作(即如果文档不存在,则插入文档)。

await moviesContainer.UpsertItemAsync<MovieModel>(updatedMovieModel);

删除文档

DynamoDB

若要删除 Amazon DynamoDB 中的项,需要再次使用基元:

Primitive hash = new Primitive(year.ToString(), true);
Primitive range = new Primitive(title, false);
DeleteItemOperationConfig deleteConfig = new DeleteItemOperationConfig();
deleteConfig.ConditionalExpression = condition;
deleteConfig.ReturnValues = ReturnValues.AllOldAttributes;
  
Document deletedItem = await table.DeleteItemAsync(hash, range, deleteConfig);

Azure Cosmos DB

在 Azure Cosmos DB 中,可以获取文档并将其异步删除:

var result = ReadingMovieItem_async_List_CosmosDB("SELECT * FROM c WHERE c.info.rating > 7 AND c.year = 2018 AND c.title = 'The Big New Movie'");
while (result.HasMoreResults)
{
    var resultModel = await result.ReadNextAsync();
    foreach (var movie in resultModel.ToList<MovieModel>())
    {
        await moviesContainer.DeleteItemAsync<MovieModel>(movie.Id, new PartitionKey(movie.Year));
    }
}

查询文档

DynamoDB

在 Amazon DynamoDB 中,需要 API 函数才能查询数据:

QueryOperationConfig config = new QueryOperationConfig();
config.Filter = new QueryFilter();
config.Filter.AddCondition("year", QueryOperator.Equal, new DynamoDBEntry[ ] { 1992 });
config.Filter.AddCondition("title", QueryOperator.Between, new DynamoDBEntry[ ] { "B", "Hzz" });
config.AttributesToGet = new List<string> { "year", "title", "info" };
config.Select = SelectValues.SpecificAttributes;
search = moviesTable.Query(config);

Azure Cosmos DB

在 Azure Cosmos DB 中,可以在简单的 SQL 查询中执行投影和筛选:

var result = moviesContainer.GetItemQueryIterator<MovieModel>( 
  "SELECT c.Year, c.Title, c.info FROM c WHERE Year = 1998 AND (CONTAINS(Title, 'B') OR CONTAINS(Title, 'Hzz'))");

对于范围操作(例如 between),您需要在 Amazon DynamoDB 中进行扫描:

ScanRequest sRequest = new ScanRequest
{
    TableName = "Movies",
    ExpressionAttributeNames = new Dictionary<string, string>
    {
        { "#yr", "year" }
    },
    ExpressionAttributeValues = new Dictionary<string, AttributeValue>
    {
        { ":y_a", new AttributeValue { N = "1960" } },
        { ":y_z", new AttributeValue { N = "1969" } },
    },
    FilterExpression = "#yr between :y_a and :y_z",
    ProjectionExpression = "#yr, title, info.actors[0], info.directors, info.running_time_secs"
};

ClientScanning_async(sRequest).Wait();

在 Azure Cosmos DB 中,可以使用 SQL 查询和单行语句:

var result = moviesContainer.GetItemQueryIterator<MovieModel>(
    "SELECT c.title, c.info.actors[0], c.info.directors, c.info.running_time_secs FROM c WHERE c.year BETWEEN 1960 AND 1969");

删除容器

DynamoDB

若要删除 Amazon DynamoDB 中的表,可指定:

await client.DeleteTableAsync(tableName);

Azure Cosmos DB

若要删除 Azure Cosmos DB 中的集合,可指定:

await moviesContainer.DeleteContainerAsync();

然后,如有必要,也删除数据库:

await cosmosDatabase.DeleteAsync();

概要

如前面的示例所示,Azure Cosmos DB 支持 SQL 查询,操作是异步的。 可以轻松地将复杂代码迁移到 Azure Cosmos DB。 迁移后,代码变得更加简单。