教程:使用 Azure Batch 通过 Azure 数据工厂运行 Python 脚本Tutorial: Run Python scripts through Azure Data Factory using Azure Batch

本教程介绍以下操作:In this tutorial, you'll learn how to:

  • 通过 Batch 和存储帐户进行身份验证Authenticate with Batch and Storage accounts
  • 在 Python 中开发和运行脚本Develop and run a script in Python
  • 创建运行应用程序所需的计算节点池Create a pool of compute nodes to run an application
  • 计划 Python 工作负荷Schedule your Python workloads
  • 监视分析管道Monitor your analytics pipeline
  • 访问日志文件Access your logfiles

以下示例运行一个 Python 脚本,该脚本从 Blob 存储容器接收 CSV 输入,执行数据处理过程,并将输出写入到单独的 Blob 存储容器。The example below runs a Python script that receives CSV input from a blob storage container, performs a data manipulation process, and writes the output to a separate blob storage container.

如果没有 Azure 订阅,可在开始前创建一个试用帐户If you don't have an Azure subscription, create a trial account before you begin.

先决条件Prerequisites

登录 AzureSign in to Azure

https://portal.azure.cn 中登录 Azure 门户。Sign in to the Azure portal at https://portal.azure.cn.

获取帐户凭据Get account credentials

就此示例来说,需为 Batch 帐户和存储帐户提供凭据。For this example, you need to provide credentials for your Batch and Storage accounts. 若要获取所需凭据,一种直接的方法是使用 Azure 门户。A straightforward way to get the necessary credentials is in the Azure portal. (也可使用 Azure API 或命令行工具来获取这些凭据。)(You can also get these credentials using the Azure APIs or command-line tools.)

  1. 选择“所有服务” > “Batch 帐户”,然后选择 Batch 帐户的名称。Select All services > Batch accounts, and then select the name of your Batch account.

  2. 若要查看 Batch 凭据,请选择“密钥”。To see the Batch credentials, select Keys. 将“Batch 帐户”、“URL”和“主访问密钥”的值复制到文本编辑器。 Copy the values of Batch account, URL, and Primary access key to a text editor.

  3. 若要查看存储帐户名称和密钥,请选择“存储帐户”。To see the Storage account name and keys, select Storage account. 将“存储帐户名称”和“Key1”的值复制到文本编辑器。 Copy the values of Storage account name and Key1 to a text editor.

使用 Batch Explorer 创建 Batch 池Create a Batch pool using Batch Explorer

在本部分,你将通过 Batch Explorer 创建 Azure 数据工厂管道要使用的 Batch 池。In this section, you'll use Batch Explorer to create the Batch pool that your Azure Data factory pipeline will use.

  1. 使用 Azure 凭据登录到 Batch Explorer。Sign in to Batch Explorer using your Azure credentials.
  2. 选择你的 Batch 帐户Select your Batch account
  3. 选择左侧栏上的“池”来创建池,然后选择搜索窗体上方的“添加”按钮 。Create a pool by selecting Pools on the left side bar, then the Add button above the search form.
    1. 选择 ID 和显示名称。Choose an ID and display name. 本示例使用 custom-activity-poolWe'll use custom-activity-pool for this example.
    2. 将规模类型为“固定大小”,将专用节点计数设置为 2。Set the scale type to Fixed size, and set the dedicated node count to 2.
    3. 在“数据科学”下,选择“Dsvm Windows”作为操作系统。 Under Data science, select Dsvm Windows as the operating system.
    4. 选择 Standard_f2s_v2 作为虚拟机大小。Choose Standard_f2s_v2 as the virtual machine size.
    5. 启用启动任务,并添加命令 cmd /c "pip install pandas"Enable the start task and add the command cmd /c "pip install pandas". 用户标识可以保留为默认的“池用户”。The user identity can remain as the default Pool user.
    6. 选择“确定”。Select OK.

创建 Blob 容器Create blob containers

此处将创建 Blob 容器,用于存储 OCR 批处理作业的输入和输出文件。Here you'll create blob containers that will store your input and output files for the OCR Batch job.

  1. 使用 Azure 凭据登录到存储资源管理器。Sign in to Storage Explorer using your Azure credentials.
  2. 使用链接到批处理帐户的存储帐户,按照创建 Blob 容器的步骤创建两个 Blob 容器(一个用于输入文件,一个用于输出文件)。Using the storage account linked to your Batch account, create two blob containers (one for input files, one for output files) by following the steps at Create a blob container.
    • 在本例中,我们将调用输入容器 input 和输出容器 outputIn this example, we'll call our input container input, and our output container output.
  3. 遵循管理 Blob 容器中的 Blob 中的步骤,使用存储资源管理器将 iris.csv 上传到输入容器 inputUpload iris.csv to your input container input using Storage Explorer by following the steps at Managing blobs in a blob container

在 Python 中开发脚本Develop a script in Python

以下 Python 脚本从 input 容器加载 iris.csv 数据集,执行数据处理过程,并将结果保存回到 output 容器。The following Python script loads the iris.csv dataset from your input container, performs a data manipulation process, and saves the results back to the output container.

# Load libraries
from azure.storage.blob import BlockBlobService
import pandas as pd

# Define parameters
storageAccountName = "<storage-account-name>"
storageKey         = "<storage-account-key>"
containerName      = "output"
storageEndpoint    = "core.chinacloudapi.cn"

# Establish connection with the blob storage account
blobService = BlockBlobService(account_name=storageAccountName,
                               account_key=storageKey,
                               endpoint_suffix=storageEndpoint
                               )

# Load iris dataset from the task node
df = pd.read_csv("iris.csv")

# Subset records
df = df[df['Species'] == "setosa"]

# Save the subset of the iris dataframe locally in task node
df.to_csv("iris_setosa.csv", index = False)

# Upload iris dataset
blobService.create_blob_from_path(containerName, "iris_setosa.csv", "iris_setosa.csv")

将脚本另存为 main.py,然后将其上传到“Azure 存储”容器。Save the script as main.py and upload it to the Azure Storage container. 在将其上传到 Blob 容器之前,请务必在本地测试并验证其功能:Be sure to test and validate its functionality locally before uploading it to your blob container:

python main.py

设置 Azure 数据工厂管道Set up an Azure Data Factory pipeline

在本部分,你将使用 Python 脚本创建并验证一个管道。In this section, you'll create and validate a pipeline using your Python script.

  1. 遵循此文的“创建数据工厂”部分中的步骤创建一个数据工厂。Follow the steps to create a data factory under the "Create a data factory" section of this article.

  2. 在“工厂资源”框中选择“+”(加号)按钮,然后选择“管道” In the Factory Resources box, select the + (plus) button and then select Pipeline

  3. 在“常规”选项卡中,将管道名称设置为“运行 Python”In the General tab, set the name of the pipeline as "Run Python"

    在“常规”选项卡中,将管道名称设置为“运行 Python”

  4. 在“活动”框中展开“Batch 服务”。 In the Activities box, expand Batch Service. 将“活动”工具箱中的自定义活动拖到管道设计器图面。Drag the custom activity from the Activities toolbox to the pipeline designer surface.

  5. 在“常规”选项卡中,指定 testPipeline 作为名称。In the General tab, specify testPipeline for Name

    在“常规”选项卡中,指定 testPipeline 作为名称

  6. 在“Azure Batch”选项卡中,添加在前面步骤中创建的 Batch 帐户,然后选择“测试连接”以确保连接成功 In the Azure Batch tab, add the Batch Account that was created in the previous steps and Test connection to ensure that it is successful

    在“Azure Batch”选项卡中,添加在前面步骤中创建的 Batch 帐户,然后测试连接

  7. 在“设置”选项卡中,输入命令 python main.pyIn the Settings tab, enter the command python main.py.

  8. 对于“资源链接服务”,请添加在前面步骤中创建的存储帐户。For the Resource Linked Service, add the storage account that was created in the previous steps. 测试连接以确保连接成功。Test the connection to ensure it is successful.

  9. 在“文件夹路径”中,选择包含 Python 脚本和关联输入的“Azure Blob 存储”容器的名称。 In the Folder Path, select the name of the Azure Blob Storage container that contains the Python script and the associated inputs. 这会在执行 Python 脚本之前,将所选文件从该容器下载到池节点实例。This will download the selected files from the container to the pool node instances before the execution of the Python script.

    在“文件夹路径”中,选择 Azure Blob 存储容器的名称

  10. 在画布上面的管道工具栏中单击“验证”,以便验证管道设置。Click Validate on the pipeline toolbar above the canvas to validate the pipeline settings. 确认已成功验证管道。Confirm that the pipeline has been successfully validated. 若要关闭验证输出,请选择 >>(右箭头)按钮。To close the validation output, select the >> (right arrow) button.

  11. 单击“调试”以测试管道,确保管道正常运行。Click Debug to test the pipeline and ensure it works accurately.

  12. 单击“发布”以发布管道。Click Publish to publish the pipeline.

  13. 单击“触发”,以便在批处理过程中运行 Python 脚本。Click Trigger to run the Python script as part of a batch process.

    单击“触发”,以便在批处理过程中运行 Python 脚本

监视日志文件Monitor the log files

如果执行脚本时生成了警告或错误,可以查看 stdout.txtstderr.txt 获取有关记录的输出的详细信息。In case warnings or errors are produced by the execution of your script, you can check out stdout.txt or stderr.txt for more information on output that was logged.

  1. 在 Batch Explorer 的左侧选择“作业”。Select Jobs from the left-hand side of Batch Explorer.
  2. 选择数据工厂创建的作业。Choose the job created by your data factory. 假设池命名为 custom-activity-pool,请选择 adfv2-custom-activity-poolAssuming you named your pool custom-activity-pool, select adfv2-custom-activity-pool.
  3. 单击具有失败退出代码的任务。Click on the task that had a failure exit code.
  4. 查看 stdout.txtstderr.txt 以调查和诊断问题。View stdout.txt and stderr.txt to investigate and diagnose your problem.

后续步骤Next steps

本教程探讨的示例演示了如何使用 Azure Batch 通过 Azure 数据工厂将 Python 脚本作为管道的一部分运行。In this tutorial, you explored an example that taught you how to run Python scripts as part of a pipeline through Azure Data Factory using Azure Batch.

若要详细了解 Azure 数据工厂,请参阅:To learn more about Azure Data Factory, see: