Bulk import data to Azure Cosmos DB for NoSQL account by using the .NET SDK
APPLIES TO: NoSQL
This tutorial shows how to build a .NET console application that optimizes provisioned throughput (RU/s) required to import data to Azure Cosmos DB.
In this article, you'll read data from a sample data source and import it into an Azure Cosmos DB container. This tutorial uses Version 3.0+ of the Azure Cosmos DB .NET SDK, which can be targeted to .NET Framework or .NET Core.
This tutorial covers:
- Creating an Azure Cosmos DB account
- Configuring your project
- Connecting to an Azure Cosmos DB account with bulk support enabled
- Perform a data import through concurrent create operations
Prerequisites
Before following the instructions in this article, make sure that you have the following resources:
An active Azure account. If you don't have an Azure subscription, create a Trial before you begin.
You can create an Azure Cosmos DB free tier account, with the first 1000 RU/s and 25 GB of storage for free. You can also use the Azure Cosmos DB emulator with a URI of
https://localhost:8081
. For the key to use with the emulator, see Authenticating requests.NET Core 3 SDK. You can verify which version is available in your environment by running
dotnet --version
.
Step 1: Create an Azure Cosmos DB account
Create an Azure Cosmos DB for NoSQL account from the Azure portal or you can create the account by using the Azure Cosmos DB Emulator.
Step 2: Set up your .NET project
Open the Windows command prompt or a Terminal window from your local computer. You'll run all the commands in the next sections from the command prompt or terminal. Run the following dotnet new command to create a new app with the name bulk-import-demo.
dotnet new console -n bulk-import-demo
Change your directory to the newly created app folder. You can build the application with:
cd bulk-import-demo
dotnet build
The expected output from the build should look something like this:
Restore completed in 100.37 ms for C:\Users\user1\Downloads\CosmosDB_Samples\bulk-import-demo\bulk-import-demo.csproj.
bulk -> C:\Users\user1\Downloads\CosmosDB_Samples\bulk-import-demo \bin\Debug\netcoreapp2.2\bulk-import-demo.dll
Build succeeded.
0 Warning(s)
0 Error(s)
Time Elapsed 00:00:34.17
Step 3: Add the Azure Cosmos DB package
While still in the application directory, install the Azure Cosmos DB client library for .NET Core by using the dotnet add package command.
dotnet add package Microsoft.Azure.Cosmos
Step 4: Get your Azure Cosmos DB account credentials
The sample application needs to authenticate to your Azure Cosmos DB account. To authenticate, you should pass the Azure Cosmos DB account credentials to the application. Get your Azure Cosmos DB account credentials by following these steps:
- Sign in to the Azure portal.
- Navigate to your Azure Cosmos DB account.
- Open the Keys pane and copy the URI and PRIMARY KEY of your account.
If you're using the Azure Cosmos DB Emulator, obtain the emulator credentials from this article.
Step 5: Initialize the CosmosClient object with bulk execution support
Open the generated Program.cs
file in a code editor. You'll create a new instance of CosmosClient with bulk execution enabled and use it to do operations against Azure Cosmos DB.
Let's start by overwriting the default Main
method and defining the multiple-regional variables. These multiple-regional variables will include the endpoint and authorization keys, the name of the database, container that you'll create, and the number of items that you'll be inserting in bulk. Make sure to replace the endpointURL and authorization key values according to your environment.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
}
}
Inside the Main
method, add the following code to initialize the CosmosClient object:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
Note
Once bulk execution is specified in the CosmosClientOptions, they are effectively immutable for the lifetime of the CosmosClient. Changing the values will have no effect.
After the bulk execution is enabled, the CosmosClient internally groups concurrent operations into single service calls. This way it optimizes the throughput utilization by distributing service calls across partitions, and finally assigning individual results to the original callers.
You can then create a container to store all our items. Define /pk
as the partition key, 50000 RU/s as provisioned throughput, and a custom indexing policy that will exclude all fields to optimize the write throughput. Add the following code after the CosmosClient initialization statement:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
Step 6: Populate a list of concurrent tasks
To take advantage of the bulk execution support, create a list of asynchronous tasks based on the source of data and the operations you want to perform, and use Task.WhenAll
to execute them concurrently.
Let's start by using "Bogus" data to generate a list of items from our data model. In a real-world application, the items would come from your desired data source.
First, add the Bogus package to the solution by using the dotnet add package command.
dotnet add package Bogus
Define the definition of the items that you want to save. You need to define the Item
class within the Program.cs
file:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
Next, create a helper function inside the Program
class. This helper function will get the number of items you defined to insert and generates random data:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
Use the helper function to initialize a list of documents to work with:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
Next use the list of documents to create concurrent tasks and populate the task list to insert the items into the container. To perform this operation, add the following code to the Program
class:
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Samples.Bulk
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
public class Program
{
private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
private const string AuthorizationKey = "<your-account-key>";
private const string DatabaseName = "bulk-tutorial";
private const string ContainerName = "items";
private const int AmountToInsert = 300000;
static async Task Main(string[] args)
{
// <CreateClient>
CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </CreateClient>
// Create with a throughput of 50000 RU/s
// Indexing Policy to exclude all attributes to maximize RU/s usage
Console.WriteLine("This tutorial will create a 50000 RU/s container, press any key to continue.");
Console.ReadKey();
// <Initialize>
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);
await database.DefineContainer(Program.ContainerName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
.Attach()
.WithExcludedPaths()
.Path("/*")
.Attach()
.Attach()
.CreateAsync(50000);
// </Initialize>
try
{
// Prepare items for insertion
Console.WriteLine($"Preparing {AmountToInsert} items to insert...");
// <Operations>
IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();
// </Operations>
// Create the list of Tasks
Console.WriteLine($"Starting...");
Stopwatch stopwatch = Stopwatch.StartNew();
// <ConcurrentTasks>
Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
// </ConcurrentTasks>
stopwatch.Stop();
Console.WriteLine($"Finished in writing {AmountToInsert} items in {stopwatch.Elapsed}.");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
Console.WriteLine("Cleaning up resources...");
await database.DeleteAsync();
}
}
// <Bogus>
private static IReadOnlyCollection<Item> GetItemsToInsert()
{
return new Bogus.Faker<Item>()
.StrictMode(true)
//Generate item
.RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
.RuleFor(o => o.username, f => f.Internet.UserName())
.RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
.Generate(AmountToInsert);
}
// </Bogus>
// <Model>
public class Item
{
public string id {get;set;}
public string pk {get;set;}
public string username{get;set;}
}
// </Model>
}
}
All these concurrent point operations will be executed together (that is in bulk) as described in the introduction section.
Step 7: Run the sample
In order to run the sample, you can do it simply by the dotnet
command:
dotnet run
Get the complete sample
If you didn't have time to complete the steps in this tutorial, or just want to download the code samples, you can get it from GitHub.
After cloning the project, make sure to update the desired credentials inside Program.cs.
The sample can be run by changing to the repository directory and using dotnet
:
cd cosmos-dotnet-bulk-import-throughput-optimizer
dotnet run
Next steps
In this tutorial, you've done the following steps:
- Creating an Azure Cosmos DB account
- Configuring your project
- Connecting to an Azure Cosmos DB account with bulk support enabled
- Perform a data import through concurrent create operations
You can now proceed to the next tutorial:
Trying to do capacity planning for a migration to Azure Cosmos DB? You can use information about your existing database cluster for capacity planning.
- If all you know is the number of vCores and servers in your existing database cluster, read about estimating request units using vCores or vCPUs
- If you know typical request rates for your current database workload, read about estimating request units using Azure Cosmos DB capacity planner