将数据移入 ML 管道和在 ML 管道之间移动数据的步骤 (Python)Moving data into and between ML pipeline steps (Python)

本文提供了用于在 Azure 机器学习管道中的步骤之间导入、转换和移动数据的代码。This article provides code for importing, transforming, and moving data between steps in an Azure Machine Learning pipeline. 有关数据在 Azure 机器学习中的工作原理的概述,请参阅访问 Azure 存储服务中的数据For an overview of how data works in Azure Machine Learning, see Access data in Azure storage services. 有关 Azure 机器学习管道的优点和结构,请参阅什么是 Azure 机器学习管道?For the benefits and structure of Azure Machine Learning pipelines, see What are Azure Machine Learning pipelines?.

本文将向你介绍如何:This article will show you how to:

  • Dataset 对象用于预先存在的数据Use Dataset objects for pre-existing data
  • 在步骤中访问数据Access data within your steps
  • Dataset 数据拆分为子集,例如训练子集和验证子集Split Dataset data into subsets, such as training and validation subsets
  • 创建 OutputFileDatasetConfig 对象来将数据传输到下一管道步骤Create OutputFileDatasetConfig objects to transfer data to the next pipeline step
  • 使用 OutputFileDatasetConfig 对象作为管道步骤的输入Use OutputFileDatasetConfig objects as input to pipeline steps
  • 基于 OutputFileDatasetConfig 创建你要持久保存的新 Dataset 对象Create new Dataset objects from OutputFileDatasetConfig you wish to persist

先决条件Prerequisites

需要:You'll need:

Dataset 对象用于预先存在的数据Use Dataset objects for pre-existing data

将数据引入到管道的首选方法是使用 Dataset 对象。The preferred way to ingest data into a pipeline is to use a Dataset object. Dataset 对象表示在整个工作区中可用的持久性数据。Dataset objects represent persistent data available throughout a workspace.

可以通过许多方法来创建和注册 Dataset 对象。There are many ways to create and register Dataset objects. 表格数据集用于一个或多个文件中可用的分隔数据。Tabular datasets are for delimited data available in one or more files. 文件数据集用于二进制数据(例如图像)或你要分析的数据。File datasets are for binary data (such as images) or for data that you'll parse. 创建 Dataset 对象的最简单编程方式是使用工作区存储或公共 URL 中的现有 blob:The simplest programmatic ways to create Dataset objects are to use existing blobs in workspace storage or public URLs:

datastore = Datastore.get(workspace, 'training_data')
iris_dataset = Dataset.Tabular.from_delimited_files(DataPath(datastore, 'iris.csv'))

datastore_path = [
    DataPath(datastore, 'animals/dog/1.jpg'),
    DataPath(datastore, 'animals/dog/2.jpg'),
    DataPath(datastore, 'animals/cat/*.jpg')
]
cats_dogs_dataset = Dataset.File.from_files(path=datastore_path)

若要详细了解如何使用各种选项以及基于各种源创建数据集、在 Azure 机器学习 UI 中注册数据集并查看它们、了解数据大小如何与计算容量交互,以及如何对数据集进行版本控制,请参阅创建 Azure 机器学习数据集For more options on creating datasets with different options and from different sources, registering them and reviewing them in the Azure Machine Learning UI, understanding how data size interacts with compute capacity, and versioning them, see Create Azure Machine Learning datasets.

将数据集传递给脚本Pass datasets to your script

若要将数据集的路径传递给你的脚本,请使用 Dataset 对象的 as_named_input() 方法。To pass the dataset's path to your script, use the Dataset object's as_named_input() method. 你可以将生成的 DatasetConsumptionConfig 对象作为参数传递给你的脚本,也可以通过使用管道脚本的 inputs 参数来使用 Run.get_context().input_datasets[] 检索数据集。You can either pass the resulting DatasetConsumptionConfig object to your script as an argument or, by using the inputs argument to your pipeline script, you can retrieve the dataset using Run.get_context().input_datasets[].

创建命名输入后,可以选择其访问模式:as_mount()as_download()Once you've created a named input, you can choose its access mode: as_mount() or as_download(). 如果你的脚本处理数据集中的所有文件,而计算资源上的磁盘足以容纳数据集,则下载访问模式是更好的选择。If your script processes all the files in your dataset and the disk on your compute resource is large enough for the dataset, the download access mode is the better choice. 下载访问模式可避免在运行时流式传输数据所产生的开销。The download access mode will avoid the overhead of streaming the data at runtime. 如果你的脚本访问数据集的子集或它对于你的计算而言太大,请使用装载访问模式。If your script accesses a subset of the dataset or it's too large for your compute, use the mount access mode. 有关详细信息,请阅读装载与下载For more information, read Mount vs. Download

若要将数据集传递给管道步骤,请执行以下操作:To pass a dataset to your pipeline step:

  1. 使用 TabularDataset.as_named_input()FileDataset.as_named_input()(末尾没有“s”)创建一个 DatasetConsumptionConfig 对象Use TabularDataset.as_named_input() or FileDataset.as_named_input() (no 's' at end) to create a DatasetConsumptionConfig object
  2. 使用 as_mount()as_download() 设置访问模式Use as_mount() or as_download() to set the access mode
  3. 使用 argumentsinputs 参数将数据集传递给管道步骤Pass the datasets to your pipeline steps using either the arguments or the inputs argument

以下代码片段显示了在 PythonScriptStep 构造函数中组合这些步骤的常见模式:The following snippet shows the common pattern of combining these steps within the PythonScriptStep constructor:


train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[iris_dataset.as_named_input('iris').as_mount()]
)

备注

你需要将所有这些参数(即,"train_data""train.py"clusteriris_dataset)的值替换为自己的数据。You would need to replace the values for all these arguments (that is, "train_data", "train.py", cluster, and iris_dataset) with your own data. 上面的代码片段仅演示了调用的形式,不是 Microsoft 示例的一部分。The above snippet just shows the form of the call and is not part of a Microsoft sample.

你还可以使用 random_split()take_sample() 等方法来创建多个输入或减少传递给管道步骤的数据量:You can also use methods such as random_split() and take_sample() to create multiple inputs or reduce the amount of data passed to your pipeline step:

seed = 42 # PRNG seed
smaller_dataset = iris_dataset.take_sample(0.1, seed=seed) # 10%
train, test = smaller_dataset.random_split(percentage=0.8, seed=seed)

train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[train.as_named_input('train').as_download(), test.as_named_input('test').as_download()]
)

在脚本中访问数据集Access datasets within your script

管道步骤脚本的命名输入可以在 Run 对象中用作字典。Named inputs to your pipeline step script are available as a dictionary within the Run object. 使用 Run.get_context() 检索活动 Run 对象,然后使用 input_datasets 检索命名输入的字典。Retrieve the active Run object using Run.get_context() and then retrieve the dictionary of named inputs using input_datasets. 如果你使用 arguments 参数而非 inputs 参数传递 DatasetConsumptionConfig 对象,请使用 ArgParser 代码访问数据。If you passed the DatasetConsumptionConfig object using the arguments argument rather than the inputs argument, access the data using ArgParser code. 以下代码片段演示了这两种技术。Both techniques are demonstrated in the following snippet.

# In pipeline definition script:
# Code for demonstration only: It would be very confusing to split datasets between `arguments` and `inputs`
train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    arguments=['--training-folder', train.as_named_input('train').as_download()]
    inputs=[test.as_named_input('test').as_download()]
)

# In pipeline script
parser = argparse.ArgumentParser()
parser.add_argument('--training-folder', type=str, dest='train_folder', help='training data folder mounting point')
args = parser.parse_args()
training_data_folder = args.train_folder

testing_data_folder = Run.get_context().input_datasets['test']

传递的值将是数据集文件的路径。The passed value will be the path to the dataset file(s).

还可以直接访问已注册的 DatasetIt's also possible to access a registered Dataset directly. 由于已注册的数据集是持久性的,并且在整个工作区中共享,因此你可以直接检索它们:Since registered datasets are persistent and shared across a workspace, you can retrieve them directly:

run = Run.get_context()
ws = run.experiment.workspace
ds = Dataset.get_by_name(workspace=ws, name='mnist_opendataset')

备注

前面的代码片段演示了调用的形式,不是 Microsoft 示例的一部分。The preceding snippets show the form of the calls and are not part of a Microsoft sample. 你必须将各种参数替换为自己项目中的值。You must replace the various arguments with values from your own project.

OutputFileDatasetConfig 用于中间数据Use OutputFileDatasetConfig for intermediate data

虽然 Dataset 对象仅代表持久性数据,但 OutputFileDatasetConfig 对象可用于从管道步骤输出的临时数据以及持久性输出数据。While Dataset objects represent only persistent data, OutputFileDatasetConfig object(s) can be used for temporary data output from pipeline steps and persistent output data. OutputFileDatasetConfig 支持将数据写入到 Blob 存储、文件共享、adlsgen1 或 adlsgen2。OutputFileDatasetConfig supports writing data to blob storage, fileshare, adlsgen1, or adlsgen2. 它同时支持装载模式和上传模式。It supports both mount mode and upload mode. 在装载模式下,当文件关闭时,写入到装载的目录中的文件将永久存储。In mount mode, files written to the mounted directory are permanently stored when the file is closed. 在上传模式下,在作业结束时,将上传写入到输出目录中的文件。In upload mode, files written to the output directory are uploaded at the end of the job. 如果作业失败或被取消,将不会上传输出目录。If the job fails or is canceled, the output directory will not be uploaded.

OutputFileDatasetConfig 对象的默认行为是写入到工作区的默认数据存储。OutputFileDatasetConfig object's default behavior is to write to the default datastore of the workspace. 可使用 arguments 参数将 OutputFileDatasetConfig 对象传递给 PythonScriptStepPass your OutputFileDatasetConfig objects to your PythonScriptStep with the arguments parameter.

from azureml.data import OutputFileDatasetConfig
dataprep_output = OutputFileDatasetConfig()
input_dataset = Dataset.get_by_name(workspace, 'raw_data')

dataprep_step = PythonScriptStep(
    name="prep_data",
    script_name="dataprep.py",
    compute_target=cluster,
    arguments=[input_dataset.as_named_input('raw_data').as_mount(), dataprep_output]
    )

你可以选择在运行结束时上传 OutputFileDatasetConfig 对象的内容。You may choose to upload the contents of your OutputFileDatasetConfig object at the end of a run. 在这种情况下,请将 as_upload() 函数与 OutputFileDatasetConfig 对象一起使用,并指定是否覆盖目标中的现有文件。In that case, use the as_upload() function along with your OutputFileDatasetConfig object, and specify whether to overwrite existing files in the destination.

#get blob datastore already registered with the workspace
blob_store= ws.datastores['my_blob_store']
OutputFileDatasetConfig(name="clean_data", destination=(blob_store, 'outputdataset')).as_upload(overwrite=False)

备注

OutputFileDatasetConfig 进行的并发写入会失败。Concurrent writes to a OutputFileDatasetConfig will fail. 请勿尝试共用单个 OutputFileDatasetConfigDo not attempt to use a single OutputFileDatasetConfig concurrently. 不要在多处理情况下(例如使用分布式训练时)共享单个 OutputFileDatasetConfigDo not share a single OutputFileDatasetConfig in a multiprocessing situation, such as when using distributed training.

OutputFileDatasetConfig 用作训练步骤的输出Use OutputFileDatasetConfig as outputs of a training step

在管道的 PythonScriptStep 中,可以使用程序的参数检索可用输出路径。Within your pipeline's PythonScriptStep, you can retrieve the available output paths using the program's arguments. 如果此步骤是第一个步骤并将初始化输出数据,则必须在指定的路径创建目录。If this step is the first and will initialize the output data, you must create the directory at the specified path. 然后,你可以写入要包含在 OutputFileDatasetConfig 中的任何文件。You can then write whatever files you wish to be contained in the OutputFileDatasetConfig.

parser = argparse.ArgumentParser()
parser.add_argument('--output_path', dest='output_path', required=True)
args = parser.parse_args()

# Make directory for file
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
    f.write("Step 1's output")

读取 OutputFileDatasetConfig 作为非初始步骤的输入Read OutputFileDatasetConfig as inputs to non-initial steps

当初始管道步骤将一些数据写入到 OutputFileDatasetConfig 路径并且这些数据成为该初始步骤的输出后,可将其用作后面步骤的输入。After the initial pipeline step writes some data to the OutputFileDatasetConfig path and it becomes an output of that initial step, it can be used as an input to a later step.

在以下代码中,In the following code,

  • step1_output_data 指示在上传访问模式下将 PythonScriptStep step1 的输出写入到 ADLS Gen 2 数据存储 my_adlsgen2 中。step1_output_data indicates that the output of the PythonScriptStep, step1 is written to the ADLS Gen 2 datastore, my_adlsgen2 in upload access mode. 详细了解如何设置角色权限以将数据写回 ADLS Gen 2 数据存储。Learn more about how to set up role permissions in order to write data back to ADLS Gen 2 datastores.

  • step1 完成并将输出写入到 step1_output_data 所指示的目标后,步骤 2 便可使用 step1_output_data 作为输入。After step1 completes and the output is written to the destination indicated by step1_output_data, then step2 is ready to use step1_output_data as an input.

# get adls gen 2 datastore already registered with the workspace
datastore = workspace.datastores['my_adlsgen2']
step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()

step1 = PythonScriptStep(
    name="generate_data",
    script_name="step1.py",
    runconfig = aml_run_config,
    arguments = ["--output_path", step1_output_data]
)

step2 = PythonScriptStep(
    name="read_pipeline_data",
    script_name="step2.py",
    compute_target=compute,
    runconfig = aml_run_config,
    arguments = ["--pd", step1_output_data.as_input()]

)

pipeline = Pipeline(workspace=ws, steps=[step1, step2])

注册 OutputFileDatasetConfig 对象供重复使用Register OutputFileDatasetConfig objects for reuse

如果你想要使你的 OutputFileDatasetConfig 的可用时间比试验持续时间更长,请将其注册到你的工作区,以便在不同的试验之间共享和重复使用。If you'd like to make your OutputFileDatasetConfig available for longer than the duration of your experiment, register it to your workspace to share and reuse across experiments.

step1_output_ds = step1_output_data.register_on_complete(name='processed_data', 
                                                         description = 'files from step1`)

不再需要 OutputFileDatasetConfig 内容时将其删除Delete OutputFileDatasetConfig contents when no longer needed

Azure 不会自动删除用 OutputFileDatasetConfig 编写的中间数据。Azure does not automatically delete intermediate data written with OutputFileDatasetConfig. 若要避免大量不需要的数据的存储费用,应执行以下操作之一:To avoid storage charges for large amounts of unneeded data, you should either:

后续步骤Next steps