教程:使用 Python API 通过 Azure Batch 运行并行工作负荷

使用 Azure Batch 在 Azure 中高效运行大规模并行和高性能计算 (HPC) 批处理作业。 本教程通过一个 Python 示例演示了如何使用 Batch 运行并行工作负荷。 你可以学习常用的 Batch 应用程序工作流,以及如何以编程方式与 Batch 和存储资源交互。

  • 通过 Batch 和存储帐户进行身份验证。
  • 将输入文件上传到存储。
  • 创建运行应用程序所需的计算节点池。
  • 创建用于处理输入文件的作业和任务。
  • 监视任务执行情况。
  • 检索输出文件。

在本教程中,你会使用 ffmpeg 开放源代码工具将 MP4 媒体文件并行转换为 MP3 格式。

如果没有 Azure 试用版订阅,请在开始前创建 Azure 试用版订阅

先决条件

登录 Azure

登录 Azure 门户

获取帐户凭据

在此示例中,你需要提供 Batch 帐户和存储帐户的凭据。 若要获取所需凭据,一种直接的方法是使用 Azure 门户。 (也可使用 Azure API 或命令行工具来获取这些凭据。)

  1. 选择所有服务>Batch 帐户,然后选择 Batch 帐户的名称。

  2. 若要查看 Batch 凭据,请选择“密钥”。 将“Batch 帐户”、“URL”和“主访问密钥”的值复制到文本编辑器。

  3. 若要查看存储帐户名称和密钥,请选择“存储帐户”。 将“存储帐户名称”和“Key1”的值复制到文本编辑器。

下载并运行示例应用

下载示例应用

从 GitHub 下载或克隆示例应用。 若要使用 Git 客户端克隆示例应用存储库,请使用以下命令:

git clone https://github.com/Azure-Samples/batch-python-ffmpeg-tutorial.git

导航到包含文件 batch_python_tutorial_ffmpeg.py 的目录。

在 Python 环境中使用 pip 安装所需的包。

pip install -r requirements.txt

使用代码编辑器打开文件 config.py。 将 Batch 和存储帐户的值更新为你的帐户所特有的名称。 此示例使用 DefaultAzureCredential 进行身份验证,因此不再需要帐户密钥。 例如:

_BATCH_ACCOUNT_NAME = 'yourbatchaccount'
_BATCH_ACCOUNT_URL = 'https://yourbatchaccount.yourbatchregion.batch.chinacloudapi.cn'
_STORAGE_ACCOUNT_NAME = 'mystorageaccount'

在运行示例之前,请使用 Azure CLI(az login)登录,或者配置DefaultAzureCredential可以发现的凭据(例如托管标识、Visual Studio Code或环境变量)。 确保已登录的身份已在 Batch 帐户(例如,Azure Batch ContributorReader)和存储帐户(例如,Storage Blob Data Contributor)上被授予相应的 Azure RBAC 角色。

运行应用

若要运行该脚本,请执行以下操作:

python batch_python_tutorial_ffmpeg.py

运行示例应用程序时,控制台输出如下所示。 在执行过程中,当池中的计算节点启动时,会在 Monitoring all tasks for 'Completed' state, timeout in 00:30:00... 处暂停一下。

Sample start: 11/28/2018 3:20:21 PM

Container [input] created.
Container [output] created.
Uploading file LowPriVMs-1.mp4 to container [input]...
Uploading file LowPriVMs-2.mp4 to container [input]...
Uploading file LowPriVMs-3.mp4 to container [input]...
Uploading file LowPriVMs-4.mp4 to container [input]...
Uploading file LowPriVMs-5.mp4 to container [input]...
Creating pool [LinuxFFmpegPool]...
Creating job [LinuxFFmpegJob]...
Adding 5 tasks to job [LinuxFFmpegJob]...
Monitoring all tasks for 'Completed' state, timeout in 00:30:00...
Success! All tasks completed successfully within the specified timeout period.
Deleting container [input]....

Sample end: 11/28/2018 3:29:36 PM
Elapsed time: 00:09:14.3418742

转到 Azure 门户中的 Batch 帐户,监视池、计算节点、作业和任务。 例如,若要查看池中计算节点的热度地图,请选择“”>“LinuxFFmpegPool”。

任务正在运行时,热度地图如下所示:

池热度地图的屏幕截图。

以默认配置运行应用程序时,典型的执行时间大约为 5 分钟。 池创建过程需要最多时间。

检索输出文件

可以使用 Azure 门户下载 ffmpeg 任务生成的输出 MP3 文件。

  1. 单击“所有服务”>“存储帐户”,然后单击存储帐户的名称。
  2. 单击“Blobs”>“output”
  3. 右键单击一个输出 MP3 文件,然后单击“下载”。 在浏览器中按提示打开或保存该文件。

下载输出文件

也可以编程方式从计算节点或存储容器下载这些文件(但在本示例中未演示)。

查看代码

以下部分将示例应用程序细分为多个执行步骤,用于处理 Batch 服务中的工作负荷。 在阅读本文的其余内容时,请参阅 Python 代码,因为我们并没有讨论示例中的每行代码。

对 Blob 和 Batch 客户端进行身份验证

此示例使用 Azure 标识包中的 DefaultAzureCredential 对存储和 Batch 进行身份验证。 DefaultAzureCredential按顺序尝试多种凭据类型(环境变量、托管标识、Azure CLI登录等),这使得相同的代码在本地开发和生产环境中运行,而无需存储帐户密钥。

为了与存储帐户交互,应用使用 azure-storage-blob 包创建使用凭据的 BlobServiceClient 对象。

credential = DefaultAzureCredential()

blob_service_client = BlobServiceClient(
    account_url=f"https://{_STORAGE_ACCOUNT_NAME}.blob.core.chinacloudapi.cn/",
    credential=credential)

该应用创建一个 BatchClient 对象,用于在 Batch 服务中创建和管理池、作业和任务。 Batch 客户端使用相同的 DefaultAzureCredential 通过 Microsoft Entra ID 进行身份验证。

batch_client = BatchClient(
    endpoint=_BATCH_ACCOUNT_URL,
    credential=credential)

上传输入文件

应用使用 blob_client 引用为输入 MP4 文件创建一个存储容器,并为任务输出创建一个容器。 然后,它会调用 upload_file_to_container 函数,将本地 InputFiles 目录中的 MP4 文件上传到容器。 存储中的文件定义为 Batch ResourceFile 对象,Batch 随后可以将这些对象下载到计算节点。

blob_service_client.create_container(input_container_name)
blob_service_client.create_container(output_container_name)
input_file_paths = []

for folder, subs, files in os.walk(os.path.join(sys.path[0], './InputFiles/')):
    for filename in files:
        if filename.endswith(".mp4"):
            input_file_paths.append(os.path.abspath(
                os.path.join(folder, filename)))

# Upload the input files. This is the collection of files that are to be processed by the tasks.
input_files = [
    upload_file_to_container(blob_service_client, input_container_name, file_path)
    for file_path in input_file_paths]

创建计算节点池

然后,该示例会调用 create_pool 以在 Batch 帐户中创建计算节点池。 此定义的函数使用 Batch BatchPoolCreateOptions 类设置节点数、VM 大小和池配置。 此处,VirtualMachineConfiguration 对象指定了一个 BatchVmImageReference,该引用指向 Azure 市场中发布的 Ubuntu Server 20.04 LTS 映像。 Batch 支持 Azure 市场中的各种 VM 映像以及自定义 VM 映像。

节点数和 VM 大小使用定义的常数进行设置。 Batch 支持专用节点,你可以在池中单独使用其中一种,也可以同时使用两种。 专用节点为您的池预留。 现成节点在 Azure 有剩余 VM 容量时以优惠价提供。 如果 Azure 没有足够容量,则现成节点会变得不可用。 默认情况下,此示例会创建一个仅包含五个大小为 Standard_A1_v2 的 Spot 节点的池。

除了物理节点属性,此池配置还包括 BatchStartTask 对象。 BatchStartTask 会在每个节点加入池时在该节点上执行,并且在每次节点重启时也会执行。 在此示例中,BatchStartTask 运行 Bash shell 命令,以在节点上安装 ffmpeg 包和依赖项。

create_pool方法将池提交到 Batch 服务。

new_pool = models.BatchPoolCreateOptions(
    id=pool_id,
    virtual_machine_configuration=models.VirtualMachineConfiguration(
        image_reference=models.BatchVmImageReference(
            publisher="Canonical",
            offer="UbuntuServer",
            sku="20.04-LTS",
            version="latest"
        ),
        node_agent_sku_id="batch.node.ubuntu 20.04"),
    vm_size=_POOL_VM_SIZE,
    target_dedicated_nodes=_DEDICATED_POOL_NODE_COUNT,
    target_low_priority_nodes=_LOW_PRIORITY_POOL_NODE_COUNT,
    start_task=models.BatchStartTask(
        command_line="/bin/bash -c \"apt-get update && apt-get install -y ffmpeg\"",
        wait_for_success=True,
        user_identity=models.UserIdentity(
            auto_user=models.AutoUserSpecification(
                scope=models.AutoUserScope.POOL,
                elevation_level=models.ElevationLevel.ADMIN)),
    )
)
batch_client.create_pool(pool=new_pool)

创建作业

Batch 作业可指定在其中运行任务的池以及可选设置,例如工作的优先级和计划。 此示例通过调用 create_job 创建一个作业。 这个已定义的函数使用 BatchJobCreateOptions 类在你的池中创建作业。 create_job方法将池提交到 Batch 服务。 最初,该作业没有任务。

job = models.BatchJobCreateOptions(
    id=job_id,
    pool_info=models.BatchPoolInfo(pool_id=pool_id))

batch_client.create_job(job=job)

创建任务

应用通过调用 add_tasks 在作业中创建任务。 此定义的函数使用 BatchTaskCreateOptions 类创建任务对象列表。 每个任务都运行 ffmpeg,使用 resource_files 参数来处理输入 command_line 对象。 ffmpeg 此前已在创建池时安装在每个节点上。 在这里,命令行运行 ffmpeg 将每个输入 MP4(视频)文件转换为 MP3(音频)文件。

此示例在运行命令行后为 MP3 文件创建 OutputFile 对象。 每个任务的输出文件(在此示例中为一个)都会使用任务的 output_files 属性上传到关联的存储帐户中的一个容器。

然后,应用使用 create_tasks 方法将任务添加到作业,该方法将任务排成队列以在计算节点上运行。

tasks = list()

for idx, input_file in enumerate(input_files):
    input_file_path = input_file.file_path
    output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3'
    command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(
        input_file_path, output_file_path)
    tasks.append(models.BatchTaskCreateOptions(
        id='Task{}'.format(idx),
        command_line=command,
        resource_files=[input_file],
        output_files=[models.OutputFile(
            file_pattern=output_file_path,
            destination=models.OutputFileDestination(
                container=models.OutputFileBlobContainerDestination(
                    container_url=output_container_sas_url)),
            upload_options=models.OutputFileUploadConfiguration(
                upload_condition=models.OutputFileUploadCondition.TASK_SUCCESS))]
    )
    )
batch_client.create_tasks(job_id=job_id, task_collection=tasks)

监控任务

将任务添加到作业后,Batch 会自动将这些任务加入队列并安排它们在关联池中的计算节点上执行。 Batch 根据指定的设置处理所有任务排队、计划、重试和其他任务管理工作。

监视任务的执行有许多方法。 此示例中的 wait_for_tasks_to_complete 函数使用 BatchTaskState 对象在限定时间内监视任务是否处于某一状态,在本例中为“已完成”状态。

while datetime.datetime.now() < timeout_expiration:
    print('.', end='')
    sys.stdout.flush()
    tasks = batch_client.list_tasks(job_id=job_id)

    incomplete_tasks = [task for task in tasks if
                        task.state != models.BatchTaskState.COMPLETED]
    if not incomplete_tasks:
        print()
        return True
    else:
        time.sleep(1)
...

清理资源

运行任务之后,应用自动删除所创建的输入存储容器,并允许你选择是否删除 Batch 池和作业。 BatchClient 的 JobOperationsPoolOperations 类都有删除方法(在确认删除时调用)。 虽然不会就作业和任务本身向您收费,但会就计算节点向您收费。 因此,建议仅在需要时分配池。 删除池时会删除节点上的所有任务输出。 但是,输入和输出文件保留在存储帐户中。

若不再需要资源组、Batch 帐户和存储帐户,请将其删除。 为此,请在 Azure 门户中选择 Batch 帐户所在的资源组,然后选择“删除资源组”。

后续步骤

在本教程中,你了解了如何执行以下操作:

  • 通过 Batch 和存储帐户进行身份验证。
  • 将输入文件上传到存储。
  • 创建运行应用程序所需的计算节点池。
  • 创建用于处理输入文件的作业和任务。
  • 监视任务执行情况。
  • 检索输出文件。

有关如何使用 Python API 来计划和处理 Batch 工作负载的更多示例,请参阅 GitHub 上的 Batch Python 示例