适用于 Python 的 Batch SDK 入门

在介绍以 Python 编写的小型 Batch 应用程序时,我们了解了 Azure Batch批处理 Python 客户端的基础知识。 我们将探讨两个示例脚本如何使用 Batch 服务来处理云中 Linux 虚拟机上的并行工作负荷,以及这些脚本如何与 Azure 存储 交互来暂存和检索文件。 你会了解常见的 Batch 应用程序工作流,并基本了解 Batch 的主要组件,例如作业、任务、池和计算节点。

Batch 解决方案工作流(基础)

先决条件

本文假设你有 Python 的实践知识,并熟悉 Linux。 本文还假定,能够满足下面为 Azure 和 Batch 及存储服务指定的帐户创建要求。

帐户

代码示例

Python 教程代码示例是 GitHub 上的 azure-batch-samples 存储库中提供的众多批处理代码示例之一。 单击存储库主页上的“克隆或下载”>“下载 ZIP”,或单击 azure-batch-samples-master.zip 直接下载链接,即可下载所有示例。 解压缩 ZIP 文件的内容后,在 article_samples 目录中可找到本教程的两个脚本:

/azure-batch-samples/Python/Batch/article_samples/python_tutorial_client.py
/azure-batch-samples/Python/Batch/article_samples/python_tutorial_task.py

Python 环境

若要在本地工作站上运行 python_tutorial_client.py 示例脚本,需要与版本 2.73.3+ 兼容的 Python 解释程序。 此脚本已在 Linux 和 Windows 上测试。

加密依赖项

必须为加密库安装 azure-batchazure-storage Python 包所需的依赖项。 根据平台执行以下操作之一,或参阅加密程序安装获取详细信息:

  • Ubuntu

    apt-get update && apt-get install -y build-essential libssl-dev libffi-dev libpython-dev python-dev

  • CentOS

    yum update && yum install -y gcc openssl-devel libffi-devel python-devel

  • SLES/OpenSUSE

    zypper ref && zypper -n in libopenssl-dev libffi48-devel python-devel

  • Windows

    pip install cryptography

Note

如果在 Linux 上安装 Python 3.3+,请使用 Python 依赖项的 python3 对等项。 例如,在 Ubuntu 上为: apt-get update && apt-get install -y build-essential libssl-dev libffi-dev libpython3-dev python3-dev

Azure 包

接下来,安装 Azure BatchAzure 存储 Python 包。 可以使用以下位置提供的 piprequirements.txt 安装这两个包:

/azure-batch-samples/Python/Batch/requirements.txt

发出以下 pip 命令以安装 Batch 和存储包:

pip install -r requirements.txt

或者,可以手动方式安装 azure-batchazure-storage Python 包:

pip install azure-batch
pip install azure-storage

Tip

如果使用无特权帐户,可能需要在命令前面加上 sudo。 例如,sudo pip install -r requirements.txt。 若要深入了解如何安装 Python 包,请参阅 python.org 中的 Installing Packages (安装包)。

Batch Python 教程代码示例

Batch Python 教程代码示例由两个 Python 脚本和若干数据文件组成。

  • python_tutorial_client.py:与批处理和存储服务交互,在计算节点(虚拟机)上执行并行工作负荷。 python_tutorial_client.py 脚本在本地工作站上运行。
  • python_tutorial_task.py:在 Azure 中的计算节点上运行,执行实际工作的脚本。 在本示例中,python_tutorial_task.py 分析从 Azure 存储下载的文件(输入文件)中的文本。 然后,它会生成一个文本文件(输出文件),其中包含出现在输入文件中的头三个单词的列表。 创建输出文件后,python_tutorial_task.py 会将该文件上传到 Azure 存储。 这样,便可以将文件下载到工作站上运行的客户端脚本。 python_tutorial_task.py 脚本在批处理服务中的多个计算节点上并行运行。
  • ./data/taskdata*.txt:这三个文本文件为计算节点上运行的任务提供输入。

下图演示了客户端和任务脚本执行的主要操作。 此基本工作流是通过 Batch 创建的许多计算解决方案中常见的工作流。 尽管它并未演示 Batch 服务提供的每项功能,但几乎每个 Batch 方案都包含此工作流的某些部分。

Batch 示例工作流

步骤 1. 在 Azure Blob 存储中创建容器
步骤 2. 将任务脚本和输入文件上传到容器。
步骤 3. 创建批处理
    3a.StartTask 在节点加入池时会任务脚本 (python_tutorial_task.py) 下载到节点。
步骤 4. 创建批处理作业
步骤 5.任务添加到作业。
    5a. 任务计划在节点上执行。
    5b. 每项任务从 Azure 存储下载其输入数据,并开始执行。
步骤 6. 监视任务。
    6a. 当任务完成时,会将其输出数据上传到 Azure 存储。
步骤 7. 从存储空间下载任务输出。

如前所述,并非每个 Batch 解决方案都会执行这些具体步骤,此类方案可能包含更多步骤,但本示例演示 Batch 方案中的常见过程。

准备客户端脚本

在运行示例之前,请将批处理和存储帐户凭据添加到 python_tutorial_client.py。 如果尚未这样做,请在偏好的编辑器中打开此文件,并使用凭据更新以下代码行。

# Update the Batch and Storage account credential strings below with the values
# unique to your accounts. These are used when constructing connection strings
# for the Batch and Storage client objects.

# Batch account credentials
BATCH_ACCOUNT_NAME = ""
BATCH_ACCOUNT_KEY = ""
BATCH_ACCOUNT_URL = ""

# Storage account credentials
STORAGE_ACCOUNT_NAME = ""
STORAGE_ACCOUNT_KEY = ""

可以在 Azure 门户中每项服务的帐户边栏选项卡中找到批处理和存储帐户凭据:

门户中的批处理凭据 门户中的存储凭据

在以下部分中,我们将分析脚本处理 Batch 服务中的工作负荷使用的步骤。 建议在执行本文余下部分所述的步骤时,经常在编辑器中查看脚本。

导航到 python_tutorial_client.py 中的以下行以开始执行步骤 1:

if __name__ == '__main__':

步骤 1:创建存储容器

在 Azure 存储中创建容器

Batch 包含的内置支持支持与 Azure 存储交互。 存储帐户中的容器将为 Batch 帐户中运行的任务提供所需的文件。 这些容器还提供存储任务生成的输出数据所需的位置。 python_tutorial_client.py 脚本执行的第一个操作是在 Azure Blob 存储中创建三个容器:

  • 应用程序:此容器存储任务运行的 Python 脚本 python_tutorial_task.py
  • 输入:任务会从输入容器下载所要处理的数据文件。
  • 输出: 当任务完成输入文件的处理时,会将其结果上传到 输出 容器。

为了与存储帐户交互并创建容器,我们将使用 azure-storage 包来创建 BlockBlobService 对象 -“Blob 客户端”。 然后,使用 Blob 客户端在存储帐户中创建三个容器。

import azure.storage.blob as azureblob

# Create the blob client, for use in obtaining references to
# blob storage containers and uploading files to containers.
blob_client = azureblob.BlockBlobService(
    account_name=STORAGE_ACCOUNT_NAME,
    account_key=STORAGE_ACCOUNT_KEY)

# Use the blob client to create the containers in Azure Storage if they
# don't yet exist.
APP_CONTAINER_NAME = 'application'
INPUT_CONTAINER_NAME = 'input'
OUTPUT_CONTAINER_NAME = 'output'
blob_client.create_container(APP_CONTAINER_NAME, fail_on_exist=False)
blob_client.create_container(INPUT_CONTAINER_NAME, fail_on_exist=False)
blob_client.create_container(OUTPUT_CONTAINER_NAME, fail_on_exist=False)

创建容器之后,应用程序现在即可上传任务使用的文件。

Tip

如何通过 Python 使用 Azure Blob 存储对如何使用 Azure 存储容器和 Blob 做了全面的概述。 开始使用 Batch 时,它应该位于阅读列表顶部附近。

步骤 2:上传任务脚本和数据文件

将任务应用程序和输入(数据)文件上传到容器

在文件上传操作中,python_tutorial_client.py 先定义应用程序和输入文件在本地计算机上的路径的集合, 然后将这些文件上传到上一步骤创建的容器。

# Paths to the task script. This script will be executed by the tasks that
# run on the compute nodes.
application_file_paths = [os.path.realpath('python_tutorial_task.py')]

# The collection of data files that are to be processed by the tasks.
input_file_paths = [os.path.realpath('./data/taskdata1.txt'),
                    os.path.realpath('./data/taskdata2.txt'),
                    os.path.realpath('./data/taskdata3.txt')]

# Upload the application script to Azure Storage. This is the script that
# will process the data files, and is executed by each of the tasks on the
# compute nodes.
application_files = [
    upload_file_to_container(blob_client, APP_CONTAINER_NAME, file_path)
    for file_path in application_file_paths]

# Upload the data files. This is the data that will be processed by each of
# the tasks executed on the compute nodes in the pool.
input_files = [
    upload_file_to_container(blob_client, INPUT_CONTAINER_NAME, file_path)
    for file_path in input_file_paths]

使用列表推导,针对集合中的每个文件调用 upload_file_to_container 函数并填充两个 ResourceFile 集合。 upload_file_to_container 函数如下所示:

def upload_file_to_container(block_blob_client, container_name, path):
    """
    Uploads a local file to an Azure Blob storage container.

    :param block_blob_client: A blob service client.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the Azure Blob storage container.
    :param str file_path: The local path to the file.
    :rtype: `azure.batch.models.ResourceFile`
    :return: A ResourceFile initialized with a SAS URL appropriate for Batch
    tasks.
    """

    import datetime
    import azure.storage.blob as azureblob
    import azure.batch.models as batchmodels

    blob_name = os.path.basename(path)

    print('Uploading file {} to container [{}]...'.format(path,
                                                          container_name))

    block_blob_client.create_blob_from_path(container_name,
                                            blob_name,
                                            file_path)

    sas_token = block_blob_client.generate_blob_shared_access_signature(
        container_name,
        blob_name,
        permission=azureblob.BlobPermissions.READ,
        expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2))

    sas_url = block_blob_client.make_blob_url(container_name,
                                              blob_name,
                                              sas_token=sas_token)

    return batchmodels.ResourceFile(file_path=blob_name,
                                    blob_source=sas_url)

ResourceFiles

ResourceFile 提供批处理中的任务,以及 Azure 存储中在任务运行之前下载到计算节点的文件的 URL。 ResourceFile.blob_source 属性指定存在于 Azure 存储的文件的完整 URL。 该 URL 还可以包含用于对文件进行安全访问的共享访问签名 (SAS)。 Batch 中的大多数任务类型都包含 ResourceFiles 属性,这些类型包括:

  • CloudTask
  • StartTask
  • JobPreparationTask
  • JobReleaseTask

本示例不使用 JobPreparationTask 或 JobReleaseTask 任务类型,但可以通过 在 Azure Batch 计算节点上运行作业准备和完成任务来详细了解这些任务类型。

共享访问签名 (SAS)

共享访问签名是一些字符串,可以提供对 Azure 存储中容器和 Blob 的安全访问。 python_tutorial_client.py 脚本使用 Blob 和容器共享访问签名,并演示如何从存储服务获取这些共享访问签名字符串。

  • Blob 共享访问签名:池的 StartTask 在从存储下载任务脚本和输入数据文件时使用 Blob 共享访问签名(请参阅下面的 步骤 3 )。 python_tutorial_client.py 中的 upload_file_to_container 函数包含可用于获取每个 Blob 的共享访问签名的代码。 它通过调用存储模块中的 BlockBlobService.make_blob_url 实现此目的。
  • 容器共享访问签名:每个任务在计算节点上完成其工作后,会将其输出文件上传到 Azure 存储中的 输出 容器。 为此,python_tutorial_task.py 将使用提供容器写入访问权限的容器共享访问签名。 python_tutorial_client.py 中的 get_container_sas_token 函数获取容器的共享访问签名,然后该签名以命令行参数的形式传递给任务。 步骤 5 将任务添加到作业介绍了容器 SAS 的用法。

Tip

请查看有关共享访问签名的两篇系列教程的第 1 部分:了解 SAS 模型第 2 部分:创建 SAS 并将其用于 Blob 服务,详细了解如何提供对存储帐户中数据的安全访问。

步骤 3:创建 Batch 池

创建批处理池

Batch 是 Batch 执行作业任务时所在的计算节点(虚拟机)集合。

将任务脚本和数据文件上传到存储帐户之后, python_tutorial_client.py 将使用批处理 Python 模块开始与批处理服务交互。 为此会创建 BatchServiceClient

# Create a Batch service client. We'll now be interacting with the Batch
# service in addition to Storage.
credentials = batchauth.SharedKeyCredentials(BATCH_ACCOUNT_NAME,
                                             BATCH_ACCOUNT_KEY)

batch_client = batch.BatchServiceClient(
    credentials,
    base_url=BATCH_ACCOUNT_URL)

接下来,调用 create_pool,在 Batch 帐户中创建计算节点池。

def create_pool(batch_service_client, pool_id,
                resource_files, publisher, offer, sku):
    """
    Creates a pool of compute nodes with the specified OS settings.

    :param batch_service_client: A Batch service client.
    :type batch_service_client: `azure.batch.BatchServiceClient`
    :param str pool_id: An ID for the new pool.
    :param list resource_files: A collection of resource files for the pool's
    start task.
    :param str publisher: Marketplace image publisher
    :param str offer: Marketplace image offer
    :param str sku: Marketplace image sku
    """
    print('Creating pool [{}]...'.format(pool_id))

    # Create a new pool of Linux compute nodes using an Azure Virtual Machines
    # Marketplace image. For more information about creating pools of Linux
    # nodes, see: https://docs.azure.cn/batch/batch-linux-nodes/

    # Specify the commands for the pool's start task. The start task is run
    # on each node as it joins the pool, and when it's rebooted or re-imaged.
    # We use the start task to prep the node for running our task script.
    task_commands = [
        # Copy the python_tutorial_task.py script to the "shared" directory
        # that all tasks that run on the node have access to.
        'cp -r $AZ_BATCH_TASK_WORKING_DIR/* $AZ_BATCH_NODE_SHARED_DIR',
        # Install pip and the dependencies for cryptography
        'apt-get update',
        'apt-get -y install python-pip',
        'apt-get -y install build-essential libssl-dev libffi-dev python-dev',
        # Install the azure-storage module so that the task script can access
        # Azure Blob storage
        'pip install azure-storage']

    # Get the node agent SKU and image reference for the virtual machine
    # configuration.
    # For more information about the virtual machine configuration, see:
    # https://docs.azure.cn/batch/batch-linux-nodes/
    sku_to_use, image_ref_to_use = \
        common.helpers.select_latest_verified_vm_image_with_node_agent_sku(
            batch_service_client, publisher, offer, sku)

    new_pool = batch.models.PoolAddParameter(
        id=pool_id,
        virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
            image_reference=image_ref_to_use,
            node_agent_sku_id=sku_to_use),
        vm_size=_POOL_VM_SIZE,
        target_dedicated=_POOL_NODE_COUNT,
        start_task=batch.models.StartTask(
            command_line=
            common.helpers.wrap_commands_in_shell('linux', task_commands),
            run_elevated=True,
            wait_for_success=True,
            resource_files=resource_files),
        )

    try:
        batch_service_client.pool.add(new_pool)
    except batchmodels.batch_error.BatchErrorException as err:
        print_batch_exception(err)
        raise

创建池时,应定义 PoolAddParameter 用于指定池的几个属性:

  • 池的 IDid - 必需)

    与 Batch 中的大多数实体一样,新池在 Batch 帐户中必须具有唯一 ID。 代码将使用池 ID 引用此池,这也是在 Azure 门户中识别池的方式。

  • 计算节点数target_dedicated - 必需)

    此属性指定应在池中部署多少个 VM。 必须注意,所有批处理帐户都有默认配额,用于限制批处理帐户中的核心(因此也包括计算节点)数目。 可以在 Quotas and limits for the Azure Batch service(Azure Batch 服务的配额和限制)中找到默认配额以及如何提高配额(例如批处理帐户中的核心数目上限)的说明。 如果有类似于“为什么我的池不能包含 X 个以上的节点? ”的疑惑,则原因可能在于此核心配额。

  • 节点的操作系统virtual_machine_configuration cloud_service_configuration - 必需)

    python_tutorial_client.py 中,使用 VirtualMachineConfiguration 创建 Linux 节点池。 common.helpers 中的 select_latest_verified_vm_image_with_node_agent_sku 函数简化了 Azure 虚拟机应用商店映像的用法。 有关使用应用商店映像的详细信息,请参阅在 Azure Batch 池中预配 Linux 计算节点

  • 计算节点的大小vm_size - 必需)

    由于我们要为 VirtualMachineConfiguration 指定 Linux 节点,因此应根据 Azure 中虚拟机的大小指定 VM 大小(在本示例中为 STANDARD_A1)。 同样,请参阅 在 Azure Batch 池中预配 Linux 计算节点 获取详细信息。

  • 启动任务start_task - 可选)

    还可以连同上述物理节点属性一起指定池的 StartTask(可选)。 StartTask 在每个节点加入池以及每次重新启动节点时在该节点上运行。 StartTask 特别适合用于准备计算节点,以便执行任务,例如安装任务将要运行的应用程序。

    在此示例应用程序中,StartTask 将它从存储下载的文件(使用 StartTask 的 resource_files 属性指定),从 StartTask 工作目录复制到在节点上运行的所有任务可以访问的共享目录。 本质上,这会在节点加入池时,将 python_tutorial_task.py 复制到每个节点上的共享目录,因此该节点上运行的任何任务都可以访问它。

可能注意到了对 wrap_commands_in_shell 帮助器函数的调用。 此函数采用不同命令的集合,并针对任务的命令行属性创建适当的单个命令行。

此外,在上述代码片段中,值得注意的问题是,StartTask 的 command_line 属性中使用了两个环境变量:AZ_BATCH_TASK_WORKING_DIRAZ_BATCH_NODE_SHARED_DIR。 自动为 Batch 池中的每个计算节点配置多个特定于 Batch 的环境变量。 由任务执行的任何进程都可以访问这些环境变量。

Tip

若要深入了解批处理池中计算节点上可用的环境变量,以及有关任务工作目录的信息,请参阅 overview of Azure Batch features(Azure Batch 功能概述)中的 Environment settings for tasks(任务的环境设置)及 Files and directories(文件和目录)。

步骤 4:创建 Batch 作业

创建 Batch 作业

Batch 作业 是任务的集合,它与计算节点池相关联。 作业中的任务在关联池的计算节点上执行。

不仅可以使用作业来组织和跟踪相关工作负荷中的任务,还可以使用它来实施特定的约束,例如作业(并扩展到其任务)的最大运行时,以及 Batch 帐户中其他作业的相关作业优先级。 不过在本示例中,该作业仅与步骤 3 中创建的池关联。 未配置任何其他属性。

所有 Batch 作业都与特定的池关联。 此关联指示将要在其上执行作业任务的节点。 可以通过 PoolInformation 属性指定池,如以下代码片段所示。

def create_job(batch_service_client, job_id, pool_id):
    """
    Creates a job with the specified ID, associated with the specified pool.

    :param batch_service_client: A Batch service client.
    :type batch_service_client: `azure.batch.BatchServiceClient`
    :param str job_id: The ID for the job.
    :param str pool_id: The ID for the pool.
    """
    print('Creating job [{}]...'.format(job_id))

    job = batch.models.JobAddParameter(
        job_id,
        batch.models.PoolInformation(pool_id=pool_id))

    try:
        batch_service_client.job.add(job)
    except batchmodels.batch_error.BatchErrorException as err:
        print_batch_exception(err)
        raise

创建作业后,可以添加任务来执行工作。

步骤 5:将任务添加到作业

将任务添加到作业
(1) 将任务添加到作业;(2) 将任务计划为在节点上运行;(3) 任务下载要处理的数据文件

Batch 任务 是在计算节点上执行的各个工作单位。 任务有一个命令行,可运行在该命令行中指定的脚本或可执行文件。

要实际执行工作,必须将任务添加到作业。 每个 CloudTask 都是使用命令行属性以及任务在其命令行自动执行之前下载到节点的 ResourceFiles(如同池的 StartTask)进行配置的。 在本示例中,每个任务只处理一个文件。 因此,其 ResourceFiles 集合包含单个元素。

def add_tasks(batch_service_client, job_id, input_files,
              output_container_name, output_container_sas_token):
    """
    Adds a task for each input file in the collection to the specified job.

    :param batch_service_client: A Batch service client.
    :type batch_service_client: `azure.batch.BatchServiceClient`
    :param str job_id: The ID of the job to which to add the tasks.
    :param list input_files: A collection of input files. One task will be
     created for each input file.
    :param output_container_name: The ID of an Azure Blob storage container to
    which the tasks will upload their results.
    :param output_container_sas_token: A SAS token granting write access to
    the specified Azure Blob storage container.
    """

    print('Adding {} tasks to job [{}]...'.format(len(input_files), job_id))

    tasks = list()

    for input_file in input_files:

        command = ['python $AZ_BATCH_NODE_SHARED_DIR/python_tutorial_task.py '
                   '--filepath {} --numwords {} --storageaccount {} '
                   '--storagecontainer {} --sastoken "{}"'.format(
                    input_file.file_path,
                    '3',
                    _STORAGE_ACCOUNT_NAME,
                    output_container_name,
                    output_container_sas_token)]

        tasks.append(batch.models.TaskAddParameter(
                'topNtask{}'.format(input_files.index(input_file)),
                wrap_commands_in_shell('linux', command),
                resource_files=[input_file]
                )
        )

    batch_service_client.task.add_collection(job_id, tasks)

Important

在访问环境变量(例如 $AZ_BATCH_NODE_SHARED_DIR)或执行节点的 PATH 中找不到的应用程序时,任务命令行必须显式调用 shell,例如,包含 /bin/sh -c MyTaskApplication $MY_ENV_VAR。 如果任务在节点的 PATH 中执行应用程序,而且不引用任何环境变量,就不必要满足此要求。

在上述代码片段中的 for 循环内,可以看到已构造任务的命令行,其中有五个命令行参数已传递到 python_tutorial_task.py

  1. filepath:这是节点上现有文件的本地路径。 在上面步骤 2 所述的 upload_file_to_container 中创建 ResourceFile 对象时,已将文件名用于此属性(ResourceFile 构造函数中的 file_path 参数)。 这意味着可以在节点上 python_tutorial_task.py 所在的同一目录中找到该文件。
  2. numwords:前 N 个单词应写入输出文件。
  3. storageaccount:存储帐户的名称,其拥有任务输出应上传到的容器。
  4. storagecontainer:输出文件应上传到的存储容器的名称。
  5. sastoken:共享访问签名 (SAS),提供对 Azure 存储中输出容器的写访问权限。 Python_tutorial_task.py 脚本在创建其 BlockBlobService 引用时使用此共享访问签名。 此参数提供对容器的写访问权限,且不需要存储帐户的访问密钥。
# NOTE: Taken from python_tutorial_task.py

# Create the blob client using the container's SAS token.
# This allows us to create a client that provides write
# access only to the container.
blob_client = azureblob.BlockBlobService(account_name=args.storageaccount,
                                         sas_token=args.sastoken)

步骤 6:监视任务

监视任务
脚本会:(1) 监视任务的完成状态,(2) 监视将结果数据上传到 Azure 存储的任务

任务在添加到作业后,自动排入队列并计划在与作业关联的池中的计算节点上执行。 根据你指定的设置,Batch 将处理所有任务排队、计划、重试和其他任务管理工作。

监视任务的执行有许多方法。 python_tutorial_client.py 中的 wait_for_tasks_to_complete 函数提供监视任务特定状态的简单示例,在本例中为 completed 状态。

def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
    """
    Returns when all tasks in the specified job reach the Completed state.

    :param batch_service_client: A Batch service client.
    :type batch_service_client: `azure.batch.BatchServiceClient`
    :param str job_id: The id of the job whose tasks should be to monitored.
    :param timedelta timeout: The duration to wait for task completion. If all
    tasks in the specified job do not reach Completed state within this time
    period, an exception will be raised.
    """
    timeout_expiration = datetime.datetime.now() + timeout

    print("Monitoring all tasks for 'Completed' state, timeout in {}..."
          .format(timeout), end='')

    while datetime.datetime.now() < timeout_expiration:
        print('.', end='')
        sys.stdout.flush()
        tasks = batch_service_client.task.list(job_id)

        incomplete_tasks = [task for task in tasks if
                            task.state != batchmodels.TaskState.completed]
        if not incomplete_tasks:
            print()
            return True
        else:
            time.sleep(1)

    print()
    raise RuntimeError("ERROR: Tasks did not reach 'Completed' state within "
                       "timeout period of " + str(timeout))

步骤 7:下载任务输出

从存储空间下载任务输出

完成作业后,可以从 Azure 存储下载任务的输出。 可通过在 python_tutorial_client.py 中调用 download_blobs_from_container 来实现此目的:

def download_blobs_from_container(block_blob_client,
                                  container_name, directory_path):
    """
    Downloads all blobs from the specified Azure Blob storage container.

    :param block_blob_client: A blob service client.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param container_name: The Azure Blob storage container from which to
     download files.
    :param directory_path: The local directory to which to download the files.
    """
    print('Downloading all files from container [{}]...'.format(
        container_name))

    container_blobs = block_blob_client.list_blobs(container_name)

    for blob in container_blobs.items:
        destination_file_path = os.path.join(directory_path, blob.name)

        block_blob_client.get_blob_to_path(container_name,
                                           blob.name,
                                           destination_file_path)

        print('  Downloaded blob [{}] from container [{}] to {}'.format(
            blob.name,
            container_name,
            destination_file_path))

    print('  Download complete!')

Note

python_tutorial_client.py 中调用 download_blobs_from_container 可指定应将文件下载到主目录。 可以随意修改此输出位置。

步骤 8:删除容器

由于需要对位于 Azure 存储中的数据付费,因此我们建议删除 Batch 作业不再需要的所有 Blob。 在 python_tutorial_client.py 中,可通过调用 BlockBlobService.delete_container 三次来实现此目的:

# Clean up storage resources
print('Deleting containers...')
blob_client.delete_container(app_container_name)
blob_client.delete_container(input_container_name)
blob_client.delete_container(output_container_name)

步骤 9:删除作业和池

在最后一个步骤,系统会提示删除 python_tutorial_client.py 脚本创建的作业和池。 虽然作业和任务本身不收费,但计算节点收费。 因此,建议只在需要的时候分配节点。 在维护过程中,可能需要删除未使用的池。

BatchServiceClient 的 JobOperationsPoolOperations 都有对应的删除方法(在确认删除时调用):

# Clean up Batch resources (if the user so chooses).
if query_yes_no('Delete job?') == 'yes':
    batch_client.job.delete(_JOB_ID)

if query_yes_no('Delete pool?') == 'yes':
    batch_client.pool.delete(_POOL_ID)

Important

请记住,需要支付计算资源的费用,而删除未使用的池可将费用降到最低。 另请注意,删除池也会删除该池内的所有计算节点,并且删除池后,将无法恢复节点上的任何数据。

运行示例脚本

从教程代码示例运行 python_tutorial_client.py 脚本时,控制台输出如下所示。 出现 Monitoring all tasks for 'Completed' state, timeout in 0:20:00... 后将会暂停,此时会创建、启动池的计算节点,并执行池启动任务中的命令。 在执行期间和之后,可以使用 Azure 门户 监视池、计算节点、作业和任务。 使用 Azure 门户Azure 存储资源管理器可以查看应用程序创建的存储资源(容器和 Blob)。

Tip

azure-batch-samples/Python/Batch/article_samples 目录内部运行 python_tutorial_client.py 脚本。 该脚本使用相对路径导入 common.helpers 模块,因此,如果不从此目录内部运行该脚本,可能会看到 ImportError: No module named 'common'

以默认配置运行本示例时,执行时间通常大约为 5-7 分钟

Sample start: 2016-05-20 22:47:10

Uploading file /home/user/py_tutorial/python_tutorial_task.py to container [application]...
Uploading file /home/user/py_tutorial/data/taskdata1.txt to container [input]...
Uploading file /home/user/py_tutorial/data/taskdata2.txt to container [input]...
Uploading file /home/user/py_tutorial/data/taskdata3.txt to container [input]...
Creating pool [PythonTutorialPool]...
Creating job [PythonTutorialJob]...
Adding 3 tasks to job [PythonTutorialJob]...
Monitoring all tasks for 'Completed' state, timeout in 0:20:00..........................................................................
  Success! All tasks reached the 'Completed' state within the specified timeout period.
Downloading all files from container [output]...
  Downloaded blob [taskdata1_OUTPUT.txt] from container [output] to /home/user/taskdata1_OUTPUT.txt
  Downloaded blob [taskdata2_OUTPUT.txt] from container [output] to /home/user/taskdata2_OUTPUT.txt
  Downloaded blob [taskdata3_OUTPUT.txt] from container [output] to /home/user/taskdata3_OUTPUT.txt
  Download complete!
Deleting containers...

Sample end: 2016-05-20 22:53:12
Elapsed time: 0:06:02

Delete job? [Y/n]
Delete pool? [Y/n]

Press ENTER to exit...

后续步骤

随意更改 python_tutorial_client.pypython_tutorial_task.py,体验不同的计算方案。 例如,尝试将执行延迟添加到 python_tutorial_task.py,模拟长时间运行的任务并在门户中监视这些任务。 尝试添加更多任务,或调整计算节点的数目。 添加逻辑来检查并允许使用现有池,以加速执行时间。

熟悉 Batch 解决方案的基本工作流后,接下来可以深入了解 Batch 服务的其他功能。

  • 如果对 Batch 服务不熟悉,建议查看 Azure Batch 功能概述 一文。
  • 通过 TopNWords 示例了解有关使用 Batch 处理“前 N 个单词”工作负荷的不同实现方式。