使用 .NET SDK 将数据批量导入 Azure Cosmos DB SQL API 帐户Bulk import data to Azure Cosmos DB SQL API account by using the .NET SDK

适用于: SQL API

本教程演示如何生成用于优化将数据导入到 Azure Cosmos DB 所需的预配吞吐量 (RU/s) 的 .NET 控制台应用程序。This tutorial shows how to build a .NET console application that optimizes provisioned throughput (RU/s) required to import data to Azure Cosmos DB. 在本文中,你将读取示例数据源中的数据,并将其导入到 Azure Cosmos 容器中。In this article, you will read data from a sample data source and import it into an Azure Cosmos container. 本教程使用 3.0+ 版 Azure Cosmos DB .NET SDK,后者以 .NET Framework 或 .NET Core 为目标。This tutorial uses Version 3.0+ of the Azure Cosmos DB .NET SDK, which can be targeted to .NET Framework or .NET Core.

本教程涉及:This tutorial covers:

  • 创建 Azure Cosmos 帐户Creating an Azure Cosmos account
  • 配置项目Configuring your project
  • 连接到启用了批量支持的 Azure Cosmos 帐户Connecting to an Azure Cosmos account with bulk support enabled
  • 通过并发创建操作执行数据导入Perform a data import through concurrent create operations

先决条件Prerequisites

在按照本文中的说明操作之前,请确保具备以下资源:Before following the instructions in this article, make sure that you have the following resources:

步骤 1:创建 Azure Cosmos DB 帐户Step 1: Create an Azure Cosmos DB account

从 Azure 门户创建 Azure Cosmos DB SQL API 帐户或者可以通过使用 Azure Cosmos DB 模拟器来创建帐户。Create an Azure Cosmos DB SQL API account from the Azure portal or you can create the account by using the Azure Cosmos DB Emulator.

步骤 2:设置 .NET 项目Step 2: Set up your .NET project

从本地计算机打开 Windows 命令提示符或终端窗口。Open the Windows command prompt or a Terminal window from your local computer. 你将从命令提示符或终端运行接下来的部分中的所有命令。You will run all the commands in the next sections from the command prompt or terminal. 运行以下 dotnet 新命令,创建名为“bulk-import-demo” 的新应用。Run the following dotnet new command to create a new app with the name bulk-import-demo. --langVersion 参数在创建的项目文件中设置 LangVersion 属性。The --langVersion parameter sets the LangVersion property in the created project file.

dotnet new console -langVersion:8 -n bulk-import-demo

将目录更改为新创建的应用文件夹。Change your directory to the newly created app folder. 可使用以下代码生成应用程序:You can build the application with:

cd bulk-import-demo
dotnet build

内部版本的预期输出应如下所示:The expected output from the build should look something like this:

Restore completed in 100.37 ms for C:\Users\user1\Downloads\CosmosDB_Samples\bulk-import-demo\bulk-import-demo.csproj.
 bulk -> C:\Users\user1\Downloads\CosmosDB_Samples\bulk-import-demo \bin\Debug\netcoreapp2.2\bulk-import-demo.dll

Build succeeded.
   0 Warning(s)
   0 Error(s)

Time Elapsed 00:00:34.17

步骤 3:添加 Azure Cosmos DB 包Step 3: Add the Azure Cosmos DB package

当仍在应用程序目录中时,使用 DotNet 添加包命令安装适用于 .NET 的 Azure Cosmos DB 客户端库。While still in the application directory, install the Azure Cosmos DB client library for .NET Core by using the dotnet add package command.

dotnet add package Microsoft.Azure.Cosmos

步骤 4:获取 Azure Cosmos 帐户凭据Step 4: Get your Azure Cosmos account credentials

此示例应用程序需对 Azure Cosmos 帐户进行身份验证。The sample application needs to authenticate to your Azure Cosmos account. 为了进行身份验证,应将 Azure Cosmos 帐户凭据传递给应用程序。To authenticate, you should pass the Azure Cosmos account credentials to the application. 按照以下步骤获取 Azure Cosmos 帐户凭据:Get your Azure Cosmos account credentials by following these steps:

  1. 登录 Azure 门户Sign in to the Azure portal.
  2. 导航到 Azure Cosmos 帐户。Navigate to your Azure Cosmos account.
  3. 打开“键”窗格,复制帐户的 URI 和主键 。Open the Keys pane and copy the URI and PRIMARY KEY of your account.

如果使用 Azure Cosmos DB 模拟器,请获取本文中的模拟器凭据If you are using the Azure Cosmos DB Emulator, obtain the emulator credentials from this article.

步骤 5:初始化具有批量执行支持的 CosmosClient 对象Step 5: Initialize the CosmosClient object with bulk execution support

在代码编辑器中打开生成的 Program.cs 文件。Open the generated Program.cs file in a code editor. 你将创建启用了批量执行的 CosmosClient 的新实例,并使用该实例对 Azure Cosmos DB 执行操作。You will create a new instance of CosmosClient with bulk execution enabled and use it to do operations against Azure Cosmos DB.

首先,让我们覆盖默认 Main 方法,并定义全局变量。Let's start by overwriting the default Main method and defining the global variables. 这些全局变量将包含终结点和授权密钥、数据库的名称、你将创建的容器,以及将批量插入的项数。These global variables will include the endpoint and authorization keys, the name of the database, container that you will create, and the number of items that you will be inserting in bulk. 确保根据环境替换终结点 URL 和授权密钥值。Make sure to replace the endpointURL and authorization key values according to your environment.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;

public class Program
{
    private const string EndpointUrl = "https://<your-account>.documents.azure.cn:443/";
    private const string AuthorizationKey = "<your-account-key>";
    private const string DatabaseName = "bulk-tutorial";
    private const string ContainerName = "items";
    private const int ItemsToInsert = 300000;

    static async Task Main(string[] args)
    {

    }
}

Main 方法中,添加以下代码以初始化 CosmosClient 对象:Inside the Main method, add the following code to initialize the CosmosClient object:

CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });

启用批量执行后,CosmosClient 在内部将并发操作分组为单个服务调用。After the bulk execution is enabled, the CosmosClient internally groups concurrent operations into single service calls. 通过这种方式,可以通过跨分区分发服务调用来优化吞吐量利用率,最后将各个结果分配给原始调用方。This way it optimizes the throughput utilization by distributing service calls across partitions, and finally assigning individual results to the original callers.

然后可以创建一个容器来存储所有项。You can then create a container to store all our items. /pk 定义为分区键,将 50000 RU/s 定义为预配的吞吐量,并定义将排除所有字段以优化写入吞吐量的自定义索引策略。Define /pk as the partition key, 50000 RU/s as provisioned throughput, and a custom indexing policy that will exclude all fields to optimize the write throughput. 在 CosmosClient 初始化语句后面添加以下代码:Add the following code after the CosmosClient initialization statement:

Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);

await database.DefineContainer(Program.ContainerName, "/pk")
        .WithIndexingPolicy()
            .WithIndexingMode(IndexingMode.Consistent)
            .WithIncludedPaths()
                .Attach()
            .WithExcludedPaths()
                .Path("/*")
                .Attach()
        .Attach()
    .CreateAsync(50000);

步骤 6:填充并发任务的列表Step 6: Populate a list of concurrent tasks

若要利用批量执行支持,请基于数据源和要执行的操作创建异步任务的列表,并使用 Task.WhenAll 并发执行它们。To take advantage of the bulk execution support, create a list of asynchronous tasks based on the source of data and the operations you want to perform, and use Task.WhenAll to execute them concurrently. 首先,让我们使用“假”数据从数据模型生成项目列表。Let's start by using "Bogus" data to generate a list of items from our data model. 在实际应用程序中,项目会来自所需的数据源。In a real-world application, the items would come from your desired data source.

首先,使用 dotnet add package 命令将假包添加到解决方案中。First, add the Bogus package to the solution by using the dotnet add package command.

dotnet add package Bogus

定义要保存的项的定义。Define the definition of the items that you want to save. 需要在 Program.cs 文件中定义 Item 类:You need to define the Item class within the Program.cs file:

public class Item
{
    public string id {get;set;}
    public string pk {get;set;}

    public string username{get;set;}
}

接下来,在 Program 类中创建一个 helper 函数。Next, create a helper function inside the Program class. 此 helper 函数将获取你定义用于插入的项数并生成随机数据:This helper function will get the number of items you defined to insert and generates random data:

private static IReadOnlyCollection<Item> GetItemsToInsert()
{
    return new Bogus.Faker<Item>()
    .StrictMode(true)
    //Generate item
    .RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
    .RuleFor(o => o.username, f => f.Internet.UserName())
    .RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
    .Generate(ItemsToInsert);
}

使用 System.Text.Json 类读取项并将其序列化为流实例。Read the items and serialize them into stream instances by using the System.Text.Json class. 由于自动生成的数据的性质,因此需要将数据序列化为流。Because of the nature of the autogenerated data, you are serializing the data as streams. 还可以直接使用项实例,但通过将其转换为流,可以利用 CosmosClient 中流 API 的性能。You can also use the item instance directly, but by converting them to streams, you can leverage the performance of stream APIs in the CosmosClient. 通常,只要知道分区键,就可以直接使用数据。Typically you can use the data directly as long as you know the partition key.

若要将数据转换为流实例,请在 Main 方法中,在创建容器后直接添加以下代码:To convert the data to stream instances, within the Main method, add the following code right after creating the container:

Dictionary<PartitionKey, Stream> itemsToInsert = new Dictionary<PartitionKey, Stream>(ItemsToInsert);
foreach (Item item in Program.GetItemsToInsert())
{
    MemoryStream stream = new MemoryStream();
    await JsonSerializer.SerializeAsync(stream, item);
    itemsToInsert.Add(new PartitionKey(item.pk), stream);
}

接下来,使用数据流创建并发任务并填充任务列表以将项插入到容器中。Next use the data streams to create concurrent tasks and populate the task list to insert the items into the container. 若要执行此操作,请将以下代码添加到 Program 类:To perform this operation, add the following code to the Program class:

Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(ItemsToInsert);
foreach (KeyValuePair<PartitionKey, Stream> item in itemsToInsert)
{
    tasks.Add(container.CreateItemStreamAsync(item.Value, item.Key)
        .ContinueWith((Task<ResponseMessage> task) =>
        {
            using (ResponseMessage response = task.Result)
            {
                if (!response.IsSuccessStatusCode)
                {
                    Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
                }
            }
        }));
}

// Wait until all are done
await Task.WhenAll(tasks);

所有这些并发点操作将一起执行(批量),如简介部分中所述。All these concurrent point operations will be executed together (that is in bulk) as described in the introduction section.

步骤 7:运行示例Step 7: Run the sample

若要运行示例,只需使用 dotnet 命令即可:In order to run the sample, you can do it simply by the dotnet command:

dotnet run

获取完整示例Get the complete sample

如果没有时间完成本教程中的步骤,或者只需下载代码示例,则可从 GitHub 获取。If you didn't have time to complete the steps in this tutorial, or just want to download the code samples, you can get it from GitHub.

克隆项目后,请确保在 Program.cs 中更新所需的凭据。After cloning the project, make sure to update the desired credentials inside Program.cs.

可以通过更改为存储库目录并使用 dotnet 来运行示例:The sample can be run by changing to the repository directory and using dotnet:

cd cosmos-dotnet-bulk-import-throughput-optimizer
dotnet run

后续步骤Next steps

在本教程中,我们已完成以下步骤:In this tutorial, you've done the following steps:

  • 创建 Azure Cosmos 帐户Creating an Azure Cosmos account
  • 配置项目Configuring your project
  • 连接到启用了批量支持的 Azure Cosmos 帐户Connecting to an Azure Cosmos account with bulk support enabled
  • 通过并发创建操作执行数据导入Perform a data import through concurrent create operations

你现在可以继续学习下一个教程:You can now proceed to the next tutorial: