通过适用于 .NET 的 Batch 客户端库开始构建解决方案

在我们分步讨论 C# 示例应用程序时,了解本文中的 Azure Batch批处理 .NET 库的基础知识。 我们将探讨该示例应用程序如何利用批处理服务来处理云中的并行工作负荷,以及如何与 Azure 存储交互来暂存和检索文件。 你会了解常见的 Batch 应用程序工作流,并基本了解 Batch 的主要组件,例如作业、任务、池和计算节点。

Batch 解决方案工作流(基础)

先决条件

本文假定你有 C# 和 Visual Studio 的使用知识。 本文还假定,能够满足下面为 Azure 和 Batch 及存储服务指定的帐户创建要求。

帐户

Visual Studio

必须安装 Visual Studio 2017 才能构建示例项目。 可以在 Visual Studio 产品概述中找到免费试用版的 Visual Studio。

DotNetTutorial 代码示例

DotNetTutorial 示例是 GitHub 上的 azure-batch-samples 存储库中提供的众多批处理代码示例之一。 单击存储库主页上的“克隆或下载”>“下载 ZIP”,或单击azure-batch-samples-master.zip”直接下载链接,即可下载所有示例。 将 ZIP 文件的内容解压缩后,可在以下文件夹中找到该解决方案:

\azure-batch-samples\CSharp\ArticleProjects\DotNetTutorial

BatchLabs(可选)

BatchLabs 是一个功能丰富的免费独立客户端工具,可帮助创建、调试和监视 Azure Batch 应用程序。 尽管完成本教程不要求使用 Azure Batch 资源管理器,但可将其用于开发和调试 Batch 解决方案。

DotNetTutorial 示例项目概述

DotNetTutorial 代码示例是由以下两个项目组成的 Visual Studio 解决方案:DotNetTutorialTaskApplication

  • DotNetTutorial 是与 Batch 和存储服务交互,以在计算节点(虚拟机)上执行并行工作负荷的客户端应用程序。 DotNetTutorial 在本地工作站上运行。
  • TaskApplication 是在 Azure 中的计算节点上运行以执行实际工作的程序。 在本示例中, TaskApplication.exe 会分析从 Azure 存储下载的文件(输入文件)中的文本。 然后,它会生成一个文本文件(输出文件),其中包含出现在输入文件中的头三个单词的列表。 在创建输出文件以后,TaskApplication 会将文件上传到 Azure 存储。 这样该文件就可供客户端应用程序下载。 TaskApplication 在 Batch 服务中的多个计算节点上并行运行。

下图演示了客户端应用程序 DotNetTutorial 执行的主要操作,以及任务执行的应用程序 TaskApplication。 此基本工作流是通过 Batch 创建的许多计算解决方案中常见的工作流。 尽管它并未演示 Batch 服务提供的每项功能,但几乎每个 Batch 方案都包含此工作流的某些部分。

Batch 示例工作流

步骤 1. 在 Azure Blob 存储中创建容器
步骤 2. 将任务应用程序文件和输入文件上传到容器。
步骤 3. 创建批处理
    3a.StartTask 在节点加入池时将任务二进制文件 (TaskApplication) 下载到节点。
步骤 4. 创建批处理作业
步骤 5.任务添加到作业。
    5a. 任务计划在节点上执行。
    5b. 每项任务从 Azure 存储下载其输入数据,并开始执行。
步骤 6. 监视任务。
    6a. 当任务完成时,会将其输出数据上传到 Azure 存储。
步骤 7. 从存储空间下载任务输出。

如前所述,并非每个 Batch 解决方案都执行这些具体步骤,此类方案可能包含更多步骤,但 DotNetTutorial 示例应用程序将演示 Batch 方案中的常见过程。

构建 DotNetTutorial 示例项目

必须先在 DotNetTutorial 项目的 Program.cs 文件中指定批处理和存储帐户凭据才能成功运行该示例。 请在 Visual Studio 中双击 DotNetTutorial.sln 解决方案文件以打开该解决方案(如果尚未这样做)。 也可以在 Visual Studio 中使用“文件”>“打开”>“项目/解决方案”菜单打开它。

DotNetTutorial 项目中打开 Program.cs。 然后,添加在文件顶部附近指定的凭据:

// Update the Batch and Storage account credential strings below with the values
// unique to your accounts. These are used when constructing connection strings
// for the Batch and Storage client objects.

// Batch account credentials
private const string BatchAccountName = "";
private const string BatchAccountKey  = "";
private const string BatchAccountUrl  = "";

// Storage account credentials
private const string StorageAccountName = "";
private const string StorageAccountKey  = "";

可以在 Azure 门户中每项服务的帐户边栏选项卡中找到批处理和存储帐户凭据:

门户中的批处理凭据 门户中的存储凭据

使用凭据更新项目后,在“解决方案资源管理器”中右键单击该解决方案,并单击“构建解决方案”。 出现提示时,请确认还原任何 NuGet 包。

Tip

如果未自动还原 NuGet 包,或者看到了有关包还原失败的错误,请确保已安装 NuGet 包管理器, 然后启用遗失包的下载。 若要启用包下载,请参阅在构建期间启用包还原

在以下部分中,我们将示例应用程序细分为用于处理 Batch 服务中工作负荷的多个步骤,并详细讨论这些步骤。 建议在学习本文的余下部分时参考 Visual Studio 中打开的解决方案,因为我们并不会讨论示例中的每一行代码。

导航到 DotNetTutorial 项目的 Program.cs 文件中 MainAsync 方法的顶部,开始执行步骤 1。 以下每个步骤大致遵循 MainAsync中方法调用的进度。

步骤 1:创建存储容器

在 Azure 存储中创建容器

Batch 包含的内置支持支持与 Azure 存储交互。 存储帐户中的容器将为 Batch 帐户中运行的任务提供所需的文件。 这些容器还提供存储任务生成的输出数据所需的位置。 DotNetTutorial 客户端应用程序首先在 Azure Blob 存储中创建三个容器:

  • 应用程序:此容器用于存储任务所要运行的应用程序及其依赖项,例如 DLL。
  • 输入:任务会从输入容器下载所要处理的数据文件。
  • 输出:当任务完成输入文件的处理时,会将其结果上传到输出容器。

为了与存储帐户交互并创建容器,我们使用了用于 .NET 的 Azure 存储客户端库。 我们将创建包含 CloudStorageAccount 的帐户引用,并从中创建 CloudBlobClient

// Construct the Storage account connection string
string storageConnectionString = $"DefaultEndpointsProtocol=https;AccountName={StorageAccountName};AccountKey={StorageAccountKey};EndpointSuffix=core.chinacloudapi.cn"; 

// Retrieve the storage account
CloudStorageAccount storageAccount =
    CloudStorageAccount.Parse(storageConnectionString);

// Create the blob client, for use in obtaining references to
// blob storage containers
CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();

我们会在整个应用程序中使用 blobClient 引用,并将它作为参数传递给多个方法。 在紧随上述代码的代码块中提供了示例,我们在其中调用 CreateContainerIfNotExistAsync 以实际创建容器。

// Use the blob client to create the containers in Azure Storage if they don't
// yet exist
const string appContainerName    = "application";
const string inputContainerName  = "input";
const string outputContainerName = "output";
await CreateContainerIfNotExistAsync(blobClient, appContainerName);
await CreateContainerIfNotExistAsync(blobClient, inputContainerName);
await CreateContainerIfNotExistAsync(blobClient, outputContainerName);
private static async Task CreateContainerIfNotExistAsync(
    CloudBlobClient blobClient,
    string containerName)
{
        CloudBlobContainer container =
            blobClient.GetContainerReference(containerName);

        if (await container.CreateIfNotExistsAsync())
        {
                Console.WriteLine("Container [{0}] created.", containerName);
        }
        else
        {
                Console.WriteLine("Container [{0}] exists, skipping creation.",
                    containerName);
        }
}

创建容器之后,应用程序现在即可上传任务使用的文件。

Tip

如何通过 .NET 使用 Blob 存储对如何使用 Azure 存储容器和 Blob 做了全面的概述。 开始使用 Batch 时,它应该位于阅读列表顶部附近。

步骤 2:上传任务应用程序和数据文件

将任务应用程序和输入(数据)文件上传到容器

在文件上传操作中,DotNetTutorial 先定义应用程序和输入文件在本地计算机上的路径的集合, 然后将这些文件上传到上一步骤创建的容器。

// Paths to the executable and its dependencies that will be executed by the tasks
List<string> applicationFilePaths = new List<string>
{
    // The DotNetTutorial project includes a project reference to TaskApplication,
    // allowing us to determine the path of the task application binary dynamically
    typeof(TaskApplication.Program).Assembly.Location,
    "Microsoft.WindowsAzure.Storage.dll"
};

// The collection of data files that are to be processed by the tasks
List<string> inputFilePaths = new List<string>
{
    @"taskdata1.txt",
    @"taskdata2.txt",
    @"taskdata3.txt"
};

// Upload the application and its dependencies to Azure Storage. This is the
// application that will process the data files, and will be executed by each
// of the tasks on the compute nodes.
List<ResourceFile> applicationFiles = await UploadFilesToContainerAsync(
    blobClient,
    appContainerName,
    applicationFilePaths);

// Upload the data files. This is the data that will be processed by each of
// the tasks that are executed on the compute nodes within the pool.
List<ResourceFile> inputFiles = await UploadFilesToContainerAsync(
    blobClient,
    inputContainerName,
    inputFilePaths);

Program.cs 中有两个方法涉及到上传过程:

  • UploadFilesToContainerAsync:此方法返回 ResourceFile 对象的集合(下面将会介绍),并在内部调用 UploadFileToContainerAsync 上传在 filePaths 参数中传递的每个文件。
  • UploadFileToContainerAsync:这是实际执行文件上传并创建 ResourceFile 对象的方法。 上传文件后,它会获取该文件的共享访问签名 (SAS) 并返回代表它的 ResourceFile 对象。 下面也会介绍共享访问签名。
private static async Task<ResourceFile> UploadFileToContainerAsync(
    CloudBlobClient blobClient,
    string containerName,
    string filePath)
{
        Console.WriteLine(
            "Uploading file {0} to container [{1}]...", filePath, containerName);

        string blobName = Path.GetFileName(filePath);

        CloudBlobContainer container = blobClient.GetContainerReference(containerName);
        CloudBlockBlob blobData = container.GetBlockBlobReference(blobName);
        await blobData.UploadFromFileAsync(filePath);

        // Set the expiry time and permissions for the blob shared access signature.
        // In this case, no start time is specified, so the shared access signature
        // becomes valid immediately
        SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
        {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(2),
                Permissions = SharedAccessBlobPermissions.Read
        };

        // Construct the SAS URL for blob
        string sasBlobToken = blobData.GetSharedAccessSignature(sasConstraints);
        string blobSasUri = $"{blobData.Uri}{sasBlobToken}";

        return new ResourceFile(blobSasUri, blobName);
}

ResourceFiles

ResourceFile 提供批处理中的任务,以及 Azure 存储中在任务运行之前下载到计算节点的文件的 URL。 ResourceFile.BlobSource 属性指定存在于 Azure 存储的文件的完整 URL。 该 URL 还可以包含用于对文件进行安全访问的共享访问签名 (SAS)。 Batch .NET 中的大多数任务类型都包含 ResourceFiles 属性,这些类型包括:

DotNetTutorial 示例应用程序不使用 JobPreparationTask 或 JobReleaseTask 任务类型,但可以通过在 Azure Batch 计算节点上运行作业准备和完成任务来详细了解这些任务类型。

共享访问签名 (SAS)

共享访问签名是一些字符串,包含为 URL 的一部分时,它们可以提供对 Azure 存储中容器和 Blob 的安全访问。 DotNetTutorial 应用程序使用 Blob 和容器共享访问签名 URL,并演示如何从存储空间服务获取这些共享访问签名字符串。

  • Blob 共享访问签名:DotNetTutorial 中池的 StartTask 在从存储空间下载应用程序二进制文件和输入数据文件时使用 Blob 共享访问签名(请参阅下面的步骤 3)。 DotNetTutorial 的 Program.cs 中的 UploadFileToContainerAsync 方法包含的代码可用于获取每个 blob 的共享访问签名。 它是通过调用 CloudBlob.GetSharedAccessSignature 来完成此操作的。
  • 容器共享访问签名:每个任务在计算节点上完成其工作后,会将其输出文件上传到 Azure 存储中的输出容器。 为此,TaskApplication 使用容器共享访问签名,在上传文件时,该共享访问签名提供对路径中包含的容器的写访问。 获取容器共享访问签名的操作方式类似于获取 blob 共享访问签名。 在 DotNetTutorial 中,会发现 GetContainerSasUrl 帮助器方法调用 CloudBlobContainer.GetSharedAccessSignature 来执行该操作。 可以在下面的“步骤 6:监视任务”中详细了解 TaskApplication 如何使用容器共享访问签名。

Tip

请查看有关共享访问签名的两篇系列教程的第 1 部分:了解共享访问签名 (SAS) 模型第 2 部分:创建共享访问签名 (SAS) 并将其用于 Blob 存储,详细了解如何提供对存储帐户中数据的安全访问。

步骤 3:创建 Batch 池

创建 Batch 池

Batch 是 Batch 执行作业任务时所在的计算节点(虚拟机)集合。

通过 Azure 存储 API 将应用程序和数据文件上传到存储帐户以后,DotNetTutorial 就会开始通过 Batch .NET 库提供的 API 调用 Batch 服务。 代码首先创建 BatchClient

BatchSharedKeyCredentials cred = new BatchSharedKeyCredentials(
    BatchAccountUrl,
    BatchAccountName,
    BatchAccountKey);

using (BatchClient batchClient = BatchClient.Open(cred))
{
    ...

然后,该示例会调用 CreatePoolIfNotExistsAsync 以在 Batch 帐户中创建计算节点池。 CreatePoolIfNotExistsAsync 使用 BatchClient.PoolOperations.CreatePool 方法在 Batch 服务中创建新池:

private static async Task CreatePoolIfNotExistAsync(BatchClient batchClient, string poolId, IList<ResourceFile> resourceFiles)
{
    CloudPool pool = null;
    try
    {
        Console.WriteLine("Creating pool [{0}]...", poolId);

        // Create the unbound pool. Until we call CloudPool.Commit() or CommitAsync(), no pool is actually created in the
        // Batch service. This CloudPool instance is therefore considered "unbound," and we can modify its properties.
        pool = batchClient.PoolOperations.CreatePool(
            poolId: poolId,
            targetDedicatedComputeNodes: 3,                                             // 3 compute nodes
            virtualMachineSize: "standard_d1_v2",               
        cloudServiceConfiguration: new CloudServiceConfiguration(osFamily: "5"));   // Windows Server 2016

        // Create and assign the StartTask that will be executed when compute nodes join the pool.
        // In this case, we copy the StartTask's resource files (that will be automatically downloaded
        // to the node by the StartTask) into the shared directory that all tasks will have access to.
        pool.StartTask = new StartTask
        {
            // Specify a command line for the StartTask that copies the task application files to the
            // node's shared directory. Every compute node in a Batch pool is configured with a number
            // of pre-defined environment variables that can be referenced by commands or applications
            // run by tasks.

            // Since a successful execution of robocopy can return a non-zero exit code (e.g. 1 when one or
            // more files were successfully copied) we need to manually exit with a 0 for Batch to recognize
            // StartTask execution success.
            CommandLine = "cmd /c (robocopy %AZ_BATCH_TASK_WORKING_DIR% %AZ_BATCH_NODE_SHARED_DIR%) ^& IF %ERRORLEVEL% LEQ 1 exit 0",
            ResourceFiles = resourceFiles,
            WaitForSuccess = true
        };

        await pool.CommitAsync();
    }
    catch (BatchException be)
    {
        // Swallow the specific error code PoolExists since that is expected if the pool already exists
        if (be.RequestInformation?.BatchError != null && be.RequestInformation.BatchError.Code == BatchErrorCodeStrings.PoolExists)
        {
            Console.WriteLine("The pool {0} already existed when we tried to create it", poolId);
        }
        else
        {
            throw; // Any other exception is unexpected
        }
    }
}

使用 CreatePool 创建池时,需指定多个参数,例如计算节点数目、节点大小以及节点的操作系统。 在 DotNetTutorial 中,我们使用 CloudServiceConfiguration云服务指定 Windows Server 2012 R2。

也可通过为池指定 VirtualMachineConfiguration 来创建是 Azure 虚拟机 (VM) 的计算节点的池。 可以根据 Windows 或 Linux 映像创建 VM 计算节点池。 VM 映像的源可以是下述任意一种:

Important

需要支付 Batch 中计算资源的费用。 若要将费用降到最低,可以在运行示例之前,将 targetDedicatedComputeNodes 降为 1。

也可以连同这些实体节点属性一起指定池的 StartTask 。 StartTask 在每个节点加入池以及每次重新启动节点时在该节点上运行。 StartTask 特别适合用于在任务执行之前在计算节点上安装应用程序。 例如,如果任务使用 Python 脚本处理数据,则可以使用 StartTask 在计算节点上安装 Python。

在此示例应用程序中,StartTask 将它从存储中下载的文件(使用 StartTask.ResourceFiles 属性指定),从 StartTask 工作目录复制到在节点上运行的所有任务可以访问的共享目录。 本质上,这会在节点加入池时,将 TaskApplication.exe 及其依赖项复制到每个节点上的共享目录,因此该节点上运行的任何任务都可以访问它。

Tip

Azure Batch 的应用程序包功能提供另一种方法,用于将应用程序转移到池中的计算节点。 有关详细信息,请参阅使用 Batch 应用程序包将应用程序部署到计算节点

此外,在上述代码片段中,值得注意的问题是,StartTask 的 CommandLine 属性中使用了两个环境变量:%AZ_BATCH_TASK_WORKING_DIR%%AZ_BATCH_NODE_SHARED_DIR%。 自动为 Batch 池中的每个计算节点配置多个特定于 Batch 的环境变量。 由任务执行的任何进程都可以访问这些环境变量。

Tip

若要深入了解批处理池中计算节点上可用的环境变量,以及有关任务工作目录的信息,请参阅面向开发人员的批处理功能概述中的任务的环境设置文件和目录部分。

步骤 4:创建 Batch 作业

创建 Batch 作业

Batch 作业 是任务的集合,它与计算节点池相关联。 作业中的任务在关联池的计算节点上执行。

不仅可以使用作业来组织和跟踪相关工作负荷中的任务,也可以使用它来实施特定的约束,例如作业(并扩展到其任务)的最大运行时,以及 Batch 帐户中其他作业的相关作业优先级。 不过在本示例中,该作业仅与步骤 3 中创建的池关联。 未配置任何其他属性。

所有 Batch 作业都与特定的池关联。 此关联指示将要在其上执行作业任务的节点。 可以通过 CloudJob.PoolInformation 属性进行这方面的指定,如下面的代码片段所示。

private static async Task CreateJobAsync(
    BatchClient batchClient,
    string jobId,
    string poolId)
{
    Console.WriteLine("Creating job [{0}]...", jobId);

    CloudJob job = batchClient.JobOperations.CreateJob();
    job.Id = jobId;
    job.PoolInformation = new PoolInformation { PoolId = poolId };

    await job.CommitAsync();
}

创建作业后,可以添加任务来执行工作。

步骤 5:将任务添加到作业

将任务添加到作业
(1) 将任务添加到作业;(2) 将任务计划为在节点上运行;(3) 任务下载要处理的数据文件

Batch 任务 是在计算节点上执行的各个工作单位。 任务有一个命令行,可运行在该命令行中指定的脚本或可执行文件。

要实际执行工作,必须将任务添加到作业。 每个 CloudTask 都是使用命令行属性以及任务在其命令行自动执行前下载到节点的 ResourceFiles(如同池的 StartTask)进行配置的。 在 DotNetTutorial 示例项目中,每个任务只处理一个文件。 因此,其 ResourceFiles 集合包含单个元素。

private static async Task<List<CloudTask>> AddTasksAsync(
    BatchClient batchClient,
    string jobId,
    List<ResourceFile> inputFiles,
    string outputContainerSasUrl)
{
    Console.WriteLine("Adding {0} tasks to job [{1}]...", inputFiles.Count, jobId);

    // Create a collection to hold the tasks that we'll be adding to the job
    List<CloudTask> tasks = new List<CloudTask>();

    // Create each of the tasks. Because we copied the task application to the
    // node's shared directory with the pool's StartTask, we can access it via
    // the shared directory on the node that the task runs on.
    foreach (ResourceFile inputFile in inputFiles)
    {
        string taskId = "topNtask" + inputFiles.IndexOf(inputFile);
        string taskCommandLine = $"cmd /c %AZ_BATCH_NODE_SHARED_DIR%\\TaskApplication.exe {inputFile.FilePath} 3 \"{outputContainerSasUrl}\"";

        CloudTask task = new CloudTask(taskId, taskCommandLine);
        task.ResourceFiles = new List<ResourceFile> { inputFile };
        tasks.Add(task);
    }

    // Add the tasks as a collection, as opposed to issuing a separate AddTask call
    // for each. Bulk task submission helps to ensure efficient underlying API calls
    // to the Batch service.
    await batchClient.JobOperations.AddTaskAsync(jobId, tasks);

    return tasks;
}

Important

在访问环境变量(例如 %AZ_BATCH_NODE_SHARED_DIR%)或执行节点的 PATH 中找不到的应用程序时,任务命令行必须带有 cmd /c 前缀。 这样才可以显式执行命令解释器,并指示其在执行命令后终止操作。 如果任务在节点的 PATH 中执行应用程序(例如 robocopy.exepowershell.exe),而且未使用任何环境变量,则不必要满足此要求。

在上述代码片段中的 foreach 循环内,可以看到已构造任务的命令行,因此有三个命令行参数已传递到 TaskApplication.exe

  1. 第一个参数 是要处理的文件的路径。 这是节点上现有文件的本地路径。 首次创建上面 UploadFileToContainerAsync 中的 ResourceFile 对象时,会将文件名用于此属性(作为 ResourceFile 构造函数的参数)。 这意味着可以在 TaskApplication.exe所在的目录中找到此文件。
  2. 第二个参数指定应将前 N 个单词写入输出文件。 在示例中,此参数已经过硬编码,因此会将前 3 个单词写入输出文件。
  3. 第三个参数是共享访问签名 (SAS),提供对 Azure 存储中输出容器的写访问。 TaskApplication.exe 使用此共享访问签名 URL。 可以在 TaskApplication 项目的 Program.cs 文件的 UploadFileToContainer 方法中找到此方面的代码:
// NOTE: From project TaskApplication Program.cs

private static void UploadFileToContainer(string filePath, string containerSas)
{
        string blobName = Path.GetFileName(filePath);

        // Obtain a reference to the container using the SAS URI.
        CloudBlobContainer container = new CloudBlobContainer(new Uri(containerSas));

        // Upload the file (as a new blob) to the container
        try
        {
                CloudBlockBlob blob = container.GetBlockBlobReference(blobName);
                blob.UploadFromFile(filePath);

                Console.WriteLine("Write operation succeeded for SAS URL " + containerSas);
                Console.WriteLine();
        }
        catch (StorageException e)
        {

                Console.WriteLine("Write operation failed for SAS URL " + containerSas);
                Console.WriteLine("Additional error information: " + e.Message);
                Console.WriteLine();

                // Indicate that a failure has occurred so that when the Batch service
                // sets the CloudTask.ExecutionInformation.ExitCode for the task that
                // executed this application, it properly indicates that there was a
                // problem with the task.
                Environment.ExitCode = -1;
        }
}

步骤 6:监视任务

监视任务
客户端应用程序会:(1) 监视任务的完成和成功状态;(2) 监视将结果数据上传到 Azure 存储的任务

任务在添加到作业后,自动排入队列并计划在与作业关联的池中的计算节点上执行。 根据你指定的设置,Batch 将处理所有任务排队、计划、重试和其他任务管理工作。

监视任务的执行有许多方法。 DotNetTutorial 显示了一个简单的示例,该示例只报告完成状态以及任务的失败或成功状态。 DotNetTutorial 的 Program.cs 中的 MonitorTasks 方法内有三个 Batch .NET 概念值得讨论。 下面按出现顺序列出了这些概念:

  1. ODATADetailLevel:必须在列出操作(例如获取作业的任务列表)中指定 ODATADetailLevel,确保批处理应用程序的性能。 若打算在 Batch 应用程序中进行任何类型的状态监视,请将 有效地查询 Azure Batch 服务 加入阅读列表。
  2. TaskStateMonitorTaskStateMonitor 为批处理 .NET 应用程提供用于监视任务状态的帮助器实用工具。 在 MonitorTasks 中,DotNetTutorial 将等待所有任务在时限内达到 TaskState.Completed, 然后终止作业。
  3. TerminateJobAsync:通过 JobOperations.TerminateJobAsync 终止作业(或阻止 JobOperations.TerminateJob)会将该作业标记为已完成。 如果批处理解决方案使用 JobReleaseTask,则这样做很重要。 这是一种特殊类型的任务,在 作业准备与完成任务中有说明。

DotNetTutorialProgram.cs 中的 MonitorTasks 方法如下所示:

private static async Task<bool> MonitorTasks(
    BatchClient batchClient,
    string jobId,
    TimeSpan timeout)
{
    bool allTasksSuccessful = true;
    const string successMessage = "All tasks reached state Completed.";
    const string failureMessage = "One or more tasks failed to reach the Completed state within the timeout period.";

    // Obtain the collection of tasks currently managed by the job. Note that we use
    // a detail level to  specify that only the "id" property of each task should be
    // populated. Using a detail level for all list operations helps to lower
    // response time from the Batch service.
    ODATADetailLevel detail = new ODATADetailLevel(selectClause: "id");
    List<CloudTask> tasks =
        await batchClient.JobOperations.ListTasks(JobId, detail).ToListAsync();

    Console.WriteLine("Awaiting task completion, timeout in {0}...",
        timeout.ToString());

    // We use a TaskStateMonitor to monitor the state of our tasks. In this case, we
    // will wait for all tasks to reach the Completed state.
    TaskStateMonitor taskStateMonitor
        = batchClient.Utilities.CreateTaskStateMonitor();

    try
    {
        await taskStateMonitor.WhenAll(tasks, TaskState.Completed, timeout);
    }
    catch (TimeoutException)
    {
        await batchClient.JobOperations.TerminateJobAsync(jobId, failureMessage);
        Console.WriteLine(failureMessage);
        return false;
    }

    await batchClient.JobOperations.TerminateJobAsync(jobId, successMessage);

    // All tasks have reached the "Completed" state, however, this does not
    // guarantee all tasks completed successfully. Here we further check each task's
    // ExecutionInfo property to ensure that it did not encounter a failure
    // or return a non-zero exit code.

    // Update the detail level to populate only the task id and executionInfo
    // properties. We refresh the tasks below, and need only this information for
    // each task.
    detail.SelectClause = "id, executionInfo";

    foreach (CloudTask task in tasks)
    {
        // Populate the task's properties with the latest info from the Batch service
        await task.RefreshAsync(detail);

        if (task.ExecutionInformation.Result == TaskExecutionResult.Failure)
        {
            // A task with failure information set indicates there was a problem with the task. It is important to note that
            // the task's state can be "Completed," yet still have encountered a failure.

            allTasksSuccessful = false;

            Console.WriteLine("WARNING: Task [{0}] encountered a failure: {1}", task.Id, task.ExecutionInformation.FailureInformation.Message);
            if (task.ExecutionInformation.ExitCode != 0)
            {
                // A non-zero exit code may indicate that the application executed by the task encountered an error
                // during execution. As not every application returns non-zero on failure by default (e.g. robocopy),
                // your implementation of error checking may differ from this example.

                Console.WriteLine("WARNING: Task [{0}] returned a non-zero exit code - this may indicate task execution or completion failure.", task.Id);
            }
        }
    }

    if (allTasksSuccessful)
    {
        Console.WriteLine("Success! All tasks completed successfully within the specified timeout period.");
    }

    return allTasksSuccessful;
}

步骤 7:下载任务输出

从存储空间下载任务输出

完成作业后,可以从 Azure 存储下载任务的输出。 可以在 DotNetTutorialProgram.cs 中调用 DownloadBlobsFromContainerAsync 来实现此目的:

private static async Task DownloadBlobsFromContainerAsync(
    CloudBlobClient blobClient,
    string containerName,
    string directoryPath)
{
        Console.WriteLine("Downloading all files from container [{0}]...", containerName);

        // Retrieve a reference to a previously created container
        CloudBlobContainer container = blobClient.GetContainerReference(containerName);

        // Get a flat listing of all the block blobs in the specified container
        foreach (IListBlobItem item in container.ListBlobs(
                    prefix: null,
                    useFlatBlobListing: true))
        {
                // Retrieve reference to the current blob
                CloudBlob blob = (CloudBlob)item;

                // Save blob contents to a file in the specified folder
                string localOutputFile = Path.Combine(directoryPath, blob.Name);
                await blob.DownloadToFileAsync(localOutputFile, FileMode.Create);
        }

        Console.WriteLine("All files downloaded to {0}", directoryPath);
}

Note

DotNetTutorial 应用程序中 DownloadBlobsFromContainerAsync 的调用可以指定应将文件下载到 %TEMP% 文件夹。 可以随意修改此输出位置。

步骤 8:删除容器

由于用户需要对位于 Azure 存储中的数据付费,因此我们建议删除批处理作业不再需要的 Blob。 在 DotNetTutorial 的 Program.cs 中,调用帮助器方法 DeleteContainerAsync 三次即可实现此目的:

// Clean up Storage resources
await DeleteContainerAsync(blobClient, appContainerName);
await DeleteContainerAsync(blobClient, inputContainerName);
await DeleteContainerAsync(blobClient, outputContainerName);

该方法本身只获取对容器的引用,并调用 CloudBlobContainer.DeleteIfExistsAsync

private static async Task DeleteContainerAsync(
    CloudBlobClient blobClient,
    string containerName)
{
    CloudBlobContainer container = blobClient.GetContainerReference(containerName);

    if (await container.DeleteIfExistsAsync())
    {
        Console.WriteLine("Container [{0}] deleted.", containerName);
    }
    else
    {
        Console.WriteLine("Container [{0}] does not exist, skipping deletion.",
            containerName);
    }
}

步骤 9:删除作业和池

在最后一个步骤中,系统会提示用户删除 DotNetTutorial 应用程序创建的作业和池。 虽然作业和任务本身不收费,但计算节点收费。 因此,建议只在需要的时候分配节点。 在维护过程中,可能需要删除未使用的池。

BatchClient 的 JobOperationsPoolOperations 都有对应的删除方法(在用户确认删除时调用):

// Clean up the resources we've created in the Batch account if the user so chooses
Console.WriteLine();
Console.WriteLine("Delete job? [yes] no");
string response = Console.ReadLine().ToLower();
if (response != "n" && response != "no")
{
    await batchClient.JobOperations.DeleteJobAsync(JobId);
}

Console.WriteLine("Delete pool? [yes] no");
response = Console.ReadLine();
if (response != "n" && response != "no")
{
    await batchClient.PoolOperations.DeletePoolAsync(PoolId);
}

Important

请记住,需要支付计算资源的费用,删除未使用的池可将费用降到最低。 另请注意,删除池也会删除该池内的所有计算节点,并且删除池后,无法恢复节点上的任何数据。

运行 DotNetTutorial 示例

运行示例应用程序时,控制台输出如下所示。 在执行期间启动池的计算节点时,会遇到暂停并看到 Awaiting task completion, timeout in 00:30:00...。 在执行期间和之后,可以使用 Azure 门户 监视池、计算节点、作业和任务。 使用 Azure 门户Azure 存储资源管理器可以查看应用程序创建的存储资源(容器和 Blob)。

以默认配置运行应用程序时,典型的执行时间 大约为 5 分钟

Sample start: 1/8/2016 09:42:58 AM

Container [application] created.
Container [input] created.
Container [output] created.
Uploading file C:\repos\azure-batch-samples\CSharp\ArticleProjects\DotNetTutorial\bin\Debug\TaskApplication.exe to container [application]...
Uploading file Microsoft.WindowsAzure.Storage.dll to container [application]...
Uploading file ..\..\taskdata1.txt to container [input]...
Uploading file ..\..\taskdata2.txt to container [input]...
Uploading file ..\..\taskdata3.txt to container [input]...
Creating pool [DotNetTutorialPool]...
Creating job [DotNetTutorialJob]...
Adding 3 tasks to job [DotNetTutorialJob]...
Awaiting task completion, timeout in 00:30:00...
Success! All tasks completed successfully within the specified timeout period.
Downloading all files from container [output]...
All files downloaded to C:\Users\USERNAME\AppData\Local\Temp
Container [application] deleted.
Container [input] deleted.
Container [output] deleted.

Sample end: 1/8/2016 09:47:47 AM
Elapsed time: 00:04:48.5358142

Delete job? [yes] no: yes
Delete pool? [yes] no: yes

Sample complete, hit ENTER to exit...

后续步骤

可以随意更改 DotNetTutorialTaskApplication,体验不同的计算方案。 例如,尝试通过某个方法(例如使用 Thread.Sleep)将执行延迟添加到 TaskApplication,模拟长时间运行的任务并在门户中监视这些任务。 尝试添加更多任务,或调整计算节点的数目。 添加逻辑来检查并允许使用现有的池加速执行时间(提示:请查看 azure-batch-samplesMicrosoft.Azure.Batch.Samples.Common 项目的 ArticleHelpers.cs)。

熟悉 Batch 解决方案的基本工作流后,接下来可以深入了解 Batch 服务的其他功能。

  • 如果对 Batch 服务不熟悉,建议查看 Azure Batch 功能概述 一文。
  • 批处理学习路径有关开发的深度知识下面列出的其他批处理开发文章着手。
  • 通过 TopNWords 示例了解有关使用 Batch 处理“前 N 个单词”工作负荷的不同实现方式。
  • 查看 Batch .NET 发行说明,了解库中的最新更改。