教程:使用 Azure Batch 通过 Azure 数据工厂运行 Python 脚本Tutorial: Run Python scripts through Azure Data Factory using Azure Batch
在本教程中,你将了解如何执行以下操作:In this tutorial, you learn how to:
- 通过 Batch 和存储帐户进行身份验证Authenticate with Batch and Storage accounts
- 在 Python 中开发和运行脚本Develop and run a script in Python
- 创建运行应用程序所需的计算节点池Create a pool of compute nodes to run an application
- 计划 Python 工作负荷Schedule your Python workloads
- 监视分析管道Monitor your analytics pipeline
- 访问日志文件Access your logfiles
以下示例运行一个 Python 脚本,该脚本从 Blob 存储容器接收 CSV 输入,执行数据处理过程,并将输出写入到单独的 Blob 存储容器。The example below runs a Python script that receives CSV input from a blob storage container, performs a data manipulation process, and writes the output to a separate blob storage container.
如果没有 Azure 订阅,请在开始前创建一个试用订阅。If you don't have an Azure subscription, create a Trial Subscription before you begin.
先决条件Prerequisites
- 已安装一个 Python 分发版用于本地测试。An installed Python distribution, for local testing.
- azure-storage-blob
pip
包。The azure-storage-blobpip
package. - iris.csv 数据集The iris.csv dataset
- Azure Batch 帐户和关联的 Azure 存储帐户。An Azure Batch account and a linked Azure Storage account. 有关如何创建 Batch 帐户并将其链接到存储帐户的详细信息,请参阅创建 Batch 帐户。See Create a Batch account for more information on how to create and link Batch accounts to storage accounts.
- 一个 Azure 数据工厂帐户。An Azure Data Factory account. 有关如何通过 Azure 门户创建数据工厂的详细信息,请参阅创建数据工厂。See Create a data factory for more information on how to create a data factory through the Azure portal.
- Batch Explorer。Batch Explorer.
- Azure 存储资源管理器。Azure Storage Explorer.
登录 AzureSign in to Azure
在 https://portal.azure.cn 中登录 Azure 门户。Sign in to the Azure portal at https://portal.azure.cn.
获取帐户凭据Get account credentials
就此示例来说,需为 Batch 帐户和存储帐户提供凭据。For this example, you need to provide credentials for your Batch and Storage accounts. 若要获取所需凭据,一种直接的方法是使用 Azure 门户。A straightforward way to get the necessary credentials is in the Azure portal. (也可使用 Azure API 或命令行工具来获取这些凭据。)(You can also get these credentials using the Azure APIs or command-line tools.)
选择“所有服务” > “Batch 帐户”,然后选择 Batch 帐户的名称。Select All services > Batch accounts, and then select the name of your Batch account.
若要查看 Batch 凭据,请选择“密钥”。To see the Batch credentials, select Keys. 将“Batch 帐户”、“URL”和“主访问密钥”的值复制到文本编辑器。 Copy the values of Batch account, URL, and Primary access key to a text editor.
若要查看存储帐户名称和密钥,请选择“存储帐户”。To see the Storage account name and keys, select Storage account. 将“存储帐户名称”和“Key1”的值复制到文本编辑器。 Copy the values of Storage account name and Key1 to a text editor.
使用 Batch Explorer 创建 Batch 池Create a Batch pool using Batch Explorer
在本部分,你将通过 Batch Explorer 创建 Azure 数据工厂管道要使用的 Batch 池。In this section, you'll use Batch Explorer to create the Batch pool that your Azure Data factory pipeline will use.
- 使用 Azure 凭据登录到 Batch Explorer。Sign in to Batch Explorer using your Azure credentials.
- 选择你的 Batch 帐户Select your Batch account
- 选择左侧栏上的“池”来创建池,然后选择搜索窗体上方的“添加”按钮 。Create a pool by selecting Pools on the left side bar, then the Add button above the search form.
- 选择 ID 和显示名称。Choose an ID and display name. 本示例使用
custom-activity-pool
。We'll usecustom-activity-pool
for this example. - 将规模类型为“固定大小”,将专用节点计数设置为 2。Set the scale type to Fixed size, and set the dedicated node count to 2.
- 在“数据科学”下,选择“Dsvm Windows”作为操作系统。 Under Data science, select Dsvm Windows as the operating system.
- 选择
Standard_f2s_v2
作为虚拟机大小。ChooseStandard_f2s_v2
as the virtual machine size. - 启用启动任务,并添加命令
cmd /c "pip install azure-storage-blob pandas"
。Enable the start task and add the commandcmd /c "pip install azure-storage-blob pandas"
. 用户标识可以保留为默认的“池用户”。The user identity can remain as the default Pool user. - 选择“确定”。Select OK.
- 选择 ID 和显示名称。Choose an ID and display name. 本示例使用
创建 Blob 容器Create blob containers
此处将创建 Blob 容器,用于存储 OCR 批处理作业的输入和输出文件。Here you'll create blob containers that will store your input and output files for the OCR Batch job.
- 使用 Azure 凭据登录到存储资源管理器。Sign in to Storage Explorer using your Azure credentials.
- 使用链接到批处理帐户的存储帐户,按照创建 Blob 容器的步骤创建两个 Blob 容器(一个用于输入文件,一个用于输出文件)。Using the storage account linked to your Batch account, create two blob containers (one for input files, one for output files) by following the steps at Create a blob container.
- 在本例中,我们将调用输入容器
input
和输出容器output
。In this example, we'll call our input containerinput
, and our output containeroutput
.
- 在本例中,我们将调用输入容器
- 遵循管理 Blob 容器中的 Blob 中的步骤,使用存储资源管理器将
iris.csv
上传到输入容器input
Uploadiris.csv
to your input containerinput
using Storage Explorer by following the steps at Managing blobs in a blob container
在 Python 中开发脚本Develop a script in Python
以下 Python 脚本从 input
容器加载 iris.csv
数据集,执行数据处理过程,并将结果保存回到 output
容器。The following Python script loads the iris.csv
dataset from your input
container, performs a data manipulation process, and saves the results back to the output
container.
# Load libraries
from azure.storage.blob import BlobServiceClient
import pandas as pd
# Define parameters
storageAccountURL = "<storage-account-url>"
storageKey = "<storage-account-key>"
containerName = "output"
storageEndpoint = "core.chinacloudapi.cn"
# Establish connection with the blob storage account
blob_service_client = BlockBlobService(account_url=storageAccountURL,
credential=storageKey
endpoint_suffix=storageEndpoint
)
# Load iris dataset from the task node
df = pd.read_csv("iris.csv")
# Subset records
df = df[df['Species'] == "setosa"]
# Save the subset of the iris dataframe locally in task node
df.to_csv("iris_setosa.csv", index = False)
# Upload iris dataset
container_client = blob_service_client.get_container_client(containerName)
with open("iris_setosa.csv", "rb") as data:
blob_client = container_client.upload_blob(name="iris_setosa.csv", data=data)
将脚本另存为 main.py
,然后将其上传到 Azure 存储的 input
容器。Save the script as main.py
and upload it to the Azure Storage input
container. 在将其上传到 Blob 容器之前,请务必在本地测试并验证其功能:Be sure to test and validate its functionality locally before uploading it to your blob container:
python main.py
设置 Azure 数据工厂管道Set up an Azure Data Factory pipeline
在本部分,你将使用 Python 脚本创建并验证一个管道。In this section, you'll create and validate a pipeline using your Python script.
遵循此文的“创建数据工厂”部分中的步骤创建一个数据工厂。Follow the steps to create a data factory under the "Create a data factory" section of this article.
在“工厂资源”框中选择“+”(加号)按钮,然后选择“管道” In the Factory Resources box, select the + (plus) button and then select Pipeline
在“常规”选项卡中,将管道名称设置为“运行 Python”In the General tab, set the name of the pipeline as "Run Python"
在“活动”框中展开“Batch 服务”。 In the Activities box, expand Batch Service. 将“活动”工具箱中的自定义活动拖到管道设计器图面。Drag the custom activity from the Activities toolbox to the pipeline designer surface.
在“常规”选项卡中,指定 testPipeline 作为名称。In the General tab, specify testPipeline for Name
在“Azure Batch”选项卡中,添加在前面步骤中创建的 Batch 帐户,然后选择“测试连接”以确保连接成功 In the Azure Batch tab, add the Batch Account that was created in the previous steps and Test connection to ensure that it is successful
在“设置”选项卡中,输入命令
python main.py
。In the Settings tab, enter the commandpython main.py
.对于“资源链接服务”,请添加在前面步骤中创建的存储帐户。For the Resource Linked Service, add the storage account that was created in the previous steps. 测试连接以确保连接成功。Test the connection to ensure it is successful.
在“文件夹路径”中,选择包含 Python 脚本和关联输入的“Azure Blob 存储”容器的名称。 In the Folder Path, select the name of the Azure Blob Storage container that contains the Python script and the associated inputs. 这会在执行 Python 脚本之前,将所选文件从该容器下载到池节点实例。This will download the selected files from the container to the pool node instances before the execution of the Python script.
在画布上面的管道工具栏中单击“验证”,以便验证管道设置。Click Validate on the pipeline toolbar above the canvas to validate the pipeline settings. 确认已成功验证管道。Confirm that the pipeline has been successfully validated. 若要关闭验证输出,请选择 >>(右箭头)按钮。To close the validation output, select the >> (right arrow) button.
单击“调试”以测试管道,确保管道正常运行。Click Debug to test the pipeline and ensure it works accurately.
单击“发布”以发布管道。Click Publish to publish the pipeline.
单击“触发”,以便在批处理过程中运行 Python 脚本。Click Trigger to run the Python script as part of a batch process.
监视日志文件Monitor the log files
如果执行脚本时生成了警告或错误,可以查看 stdout.txt
或 stderr.txt
获取有关记录的输出的详细信息。In case warnings or errors are produced by the execution of your script, you can check out stdout.txt
or stderr.txt
for more information on output that was logged.
- 在 Batch Explorer 的左侧选择“作业”。Select Jobs from the left-hand side of Batch Explorer.
- 选择数据工厂创建的作业。Choose the job created by your data factory. 假设池命名为
custom-activity-pool
,请选择adfv2-custom-activity-pool
。Assuming you named your poolcustom-activity-pool
, selectadfv2-custom-activity-pool
. - 单击具有失败退出代码的任务。Click on the task that had a failure exit code.
- 查看
stdout.txt
和stderr.txt
以调查和诊断问题。Viewstdout.txt
andstderr.txt
to investigate and diagnose your problem.
清理资源Clean up resources
虽然作业和任务本身不收费,但计算节点收费。Although you're not charged for jobs and tasks themselves, you are charged for compute nodes. 因此,建议只在需要的时候分配池。Thus, we recommend that you allocate pools only as needed. 删除池时会删除节点上的所有任务输出。When you delete the pool, all task output on the nodes is deleted. 但是,输入和输出文件保留在存储帐户中。However, the input and output files remain in the storage account. 当不再需要时,还可以删除 Batch 帐户和存储帐户。When no longer needed, you can also delete the Batch account and the storage account.
后续步骤Next steps
在本教程中,你了解了如何执行以下操作:In this tutorial, you learned how to:
- 通过 Batch 和存储帐户进行身份验证Authenticate with Batch and Storage accounts
- 在 Python 中开发和运行脚本Develop and run a script in Python
- 创建运行应用程序所需的计算节点池Create a pool of compute nodes to run an application
- 计划 Python 工作负荷Schedule your Python workloads
- 监视分析管道Monitor your analytics pipeline
- 访问日志文件Access your logfiles
若要详细了解 Azure 数据工厂,请参阅:To learn more about Azure Data Factory, see: