使用 .NET SDK 将数据批量导入 Azure Cosmos DB for NoSQL 帐户
适用范围: NoSQL
本教程演示如何生成用于优化将数据导入到 Azure Cosmos DB 所需的预配吞吐量 (RU/s) 的 .NET 控制台应用程序。
在本文中,你将读取示例数据源中的数据,并将其导入到 Azure Cosmos DB 容器中。 本教程使用 3.0+ 版 Azure Cosmos DB .NET SDK,后者以 .NET Framework 或 .NET Core 为目标。
本教程涉及:
- 创建 Azure Cosmos DB 帐户
- 配置项目
- 连接到启用了批量支持的 Azure Cosmos DB 帐户
- 通过并发创建操作执行数据导入
先决条件
在按照本文中的说明操作之前,请确保具备以下资源:
有效的 Azure 帐户。 如果没有 Azure 订阅,可在开始前创建一个试用帐户。
可以创建一个 Azure Cosmos DB 免费层帐户,你将在帐户中获得前 1000 RU/s 的免费吞吐量和 25 GB 的免费存储。 还可以使用 URI 为
https://localhost:8081
的 Azure Cosmos DB 模拟器。 有关在模拟器中使用的密钥,请参阅对请求进行身份验证。NET Core 3 SDK。 可以通过运行
dotnet --version
来验证你的环境中哪个版本可用。
步骤 1:创建 Azure Cosmos DB 帐户
从 Azure 门户创建 Azure Cosmos DB for NoSQL 帐户或者可以通过使用 Azure Cosmos DB 模拟器来创建帐户。
步骤 2:设置 .NET 项目
从本地计算机打开 Windows 命令提示符或终端窗口。 你将从命令提示符或终端运行接下来的部分中的所有命令。 运行以下 dotnet 新命令,创建名为“bulk-import-demo” 的新应用。
dotnet new console -n bulk-import-demo
将目录更改为新创建的应用文件夹。 可使用以下代码生成应用程序:
cd bulk-import-demo
dotnet build
内部版本的预期输出应如下所示:
Restore completed in 100.37 ms for C:\Users\user1\Downloads\CosmosDB_Samples\bulk-import-demo\bulk-import-demo.csproj.
bulk -> C:\Users\user1\Downloads\CosmosDB_Samples\bulk-import-demo \bin\Debug\netcoreapp2.2\bulk-import-demo.dll
Build succeeded.
0 Warning(s)
0 Error(s)
Time Elapsed 00:00:34.17
步骤 3:添加 Azure Cosmos DB 包
当仍在应用程序目录中时,使用 DotNet 添加包命令安装适用于 .NET 的 Azure Cosmos DB 客户端库。
dotnet add package Microsoft.Azure.Cosmos
步骤 4:获取 Azure Cosmos DB 帐户凭据
此示例应用程序需对 Azure Cosmos DB 帐户进行身份验证。 为了进行身份验证,应将 Azure Cosmos DB 帐户凭据传递给应用程序。 按照以下步骤获取 Azure Cosmos DB 帐户凭据:
- 登录 Azure 门户。
- 导航到 Azure Cosmos DB 帐户。
- 打开“键”窗格,复制帐户的 URI 和主键 。
如果使用 Azure Cosmos DB 模拟器,请获取本文中的模拟器凭据。
步骤 5:初始化具有批量执行支持的 CosmosClient 对象
在代码编辑器中打开生成的 Program.cs
文件。 你将创建启用了批量执行的 CosmosClient 的新实例,并使用该实例对 Azure Cosmos DB 执行操作。
首先,让我们覆盖默认 Main
方法,并定义多区域变量。 这些多区域变量将包含终结点和授权密钥、数据库的名称、你将创建的容器,以及将批量插入的项数。 确保根据环境替换终结点 URL 和授权密钥值。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
}
}
在 Main
方法中,添加以下代码以初始化 CosmosClient 对象:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
注意
在 CosmosClientOptions 中指定批量执行后,它们实际上在 CosmosClient 的生存期内不可变。 更改值将不起作用。
启用批量执行后,CosmosClient 在内部将并发操作分组为单个服务调用。 通过这种方式,可以通过跨分区分发服务调用来优化吞吐量利用率,最后将各个结果分配给原始调用方。
然后可以创建一个容器来存储所有项。 将 /pk
定义为分区键,将 50000 RU/s 定义为预配的吞吐量,并定义将排除所有字段以优化写入吞吐量的自定义索引策略。 在 CosmosClient 初始化语句后面添加以下代码:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
步骤 6:填充并发任务的列表
若要利用批量执行支持,请基于数据源和要执行的操作创建异步任务的列表,并使用 Task.WhenAll
并发执行它们。
首先,让我们使用“假”数据从数据模型生成项目列表。 在实际应用程序中,项目会来自所需的数据源。
首先,使用 dotnet add package 命令将假包添加到解决方案中。
dotnet add package Bogus
定义要保存的项的定义。 需要在 Program.cs
文件中定义 Item
类:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
接下来,在 Program
类中创建一个 helper 函数。 此 helper 函数将获取你定义用于插入的项数并生成随机数据:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
使用帮助程序函数初始化要使用的文档列表:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
接下来,使用文档列表创建并发任务并填充任务列表以将项插入到容器中。 若要执行此操作,请将以下代码添加到 Program
类:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
所有这些并发点操作将一起执行(批量),如简介部分中所述。
步骤 7:运行示例
若要运行示例,只需使用 dotnet
命令即可:
dotnet run
获取完整示例
如果没有时间完成本教程中的步骤,或者只需下载代码示例,则可从 GitHub 获取。
克隆项目后,请确保在 Program.cs 中更新所需的凭据。
可以通过更改为存储库目录并使用 dotnet
来运行示例:
cd cosmos-dotnet-bulk-import-throughput-optimizer
dotnet run
后续步骤
在本教程中,我们已完成以下步骤:
- 创建 Azure Cosmos DB 帐户
- 配置项目
- 连接到启用了批量支持的 Azure Cosmos DB 帐户
- 通过并发创建操作执行数据导入
你现在可以继续学习下一个教程:
尝试为迁移到 Azure Cosmos DB 进行容量计划? 可以使用有关现有数据库群集的信息进行容量规划。
- 如果你只知道现有数据库群集中的 vCore 和服务器数量,请阅读根据 vCore 或 vCPU 数量估算请求单位数
- 若知道当前数据库工作负载的典型请求速率,请阅读使用 Azure Cosmos DB 容量计划工具估算请求单位