将数据移入 ML 管道和在 ML 管道之间移动数据的步骤 (Python)Moving data into and between ML pipeline steps (Python)
本文提供了用于在 Azure 机器学习管道中的步骤之间导入、转换和移动数据的代码。This article provides code for importing, transforming, and moving data between steps in an Azure Machine Learning pipeline. 有关数据在 Azure 机器学习中的工作原理的概述,请参阅访问 Azure 存储服务中的数据。For an overview of how data works in Azure Machine Learning, see Access data in Azure storage services. 有关 Azure 机器学习管道的优点和结构,请参阅什么是 Azure 机器学习管道?。For the benefits and structure of Azure Machine Learning pipelines, see What are Azure Machine Learning pipelines?.
本文将向你介绍如何:This article will show you how to:
- 将
Dataset
对象用于预先存在的数据UseDataset
objects for pre-existing data - 在步骤中访问数据Access data within your steps
- 将
Dataset
数据拆分为子集,例如训练子集和验证子集SplitDataset
data into subsets, such as training and validation subsets - 创建
PipelineData
对象来将数据传输到下一管道步骤CreatePipelineData
objects to transfer data to the next pipeline step - 使用
PipelineData
对象作为管道步骤的输入UsePipelineData
objects as input to pipeline steps - 基于
PipelineData
创建你要持久保存的新Dataset
对象Create newDataset
objects fromPipelineData
you wish to persist
提示
公共预览版类 OutputFileDatasetConfig
和 OutputTabularDatasetConfig
改进了在管道步骤之间传递临时数据以及在管道运行之后持久保存数据的体验。An improved experience for passing temporary data between pipeline steps and persisting your data after pipeline runs is available in the public preview classes, OutputFileDatasetConfig
and OutputTabularDatasetConfig
. 这些类属于试验性预览功能,可能会随时发生变化。These classes are experimental preview features, and may change at any time.
先决条件Prerequisites
需要:You'll need:
Azure 订阅。An Azure subscription. 如果没有 Azure 订阅,请在开始前创建一个试用帐户。If you don't have an Azure subscription, create a trial account before you begin. 试用试用版 Azure 机器学习。Try the trial version of Azure Machine Learning.
适用于 Python 的 Azure 机器学习 SDK,或 Azure 机器学习工作室的访问权限。The Azure Machine Learning SDK for Python, or access to Azure Machine Learning studio.
Azure 机器学习工作区。An Azure Machine Learning workspace.
创建 Azure 机器学习工作区或通过 Python SDK 使用现有工作区。Either create an Azure Machine Learning workspace or use an existing one via the Python SDK. 导入
Workspace
和Datastore
类,并使用函数from_config()
从文件config.json
中加载订阅信息。Import theWorkspace
andDatastore
class, and load your subscription information from the fileconfig.json
using the functionfrom_config()
. 默认情况下,此函数会查找当前目录中的 JSON 文件,但你也可以使用from_config(path="your/file/path")
指定一个路径参数,使之指向该文件。This function looks for the JSON file in the current directory by default, but you can also specify a path parameter to point to the file usingfrom_config(path="your/file/path")
.import azureml.core from azureml.core import Workspace, Datastore ws = Workspace.from_config()
某些预先存在的数据。Some pre-existing data. 本文简要介绍了如何使用 Azure blob 容器。This article briefly shows the use of an Azure blob container.
可选:现有机器学习管道,例如使用 Azure 机器学习 SDK 创建和运行机器学习管道中所述的管道。Optional: An existing machine learning pipeline, such as the one described in Create and run machine learning pipelines with Azure Machine Learning SDK.
将 Dataset
对象用于预先存在的数据Use Dataset
objects for pre-existing data
将数据引入到管道的首选方法是使用 Dataset 对象。The preferred way to ingest data into a pipeline is to use a Dataset object. Dataset
对象表示在整个工作区中可用的持久性数据。Dataset
objects represent persistent data available throughout a workspace.
可以通过许多方法来创建和注册 Dataset
对象。There are many ways to create and register Dataset
objects. 表格数据集用于一个或多个文件中可用的分隔数据。Tabular datasets are for delimited data available in one or more files. 文件数据集用于二进制数据(例如图像)或你要分析的数据。File datasets are for binary data (such as images) or for data that you'll parse. 创建 Dataset
对象的最简单编程方式是使用工作区存储或公共 URL 中的现有 blob:The simplest programmatic ways to create Dataset
objects are to use existing blobs in workspace storage or public URLs:
datastore = Datastore.get(workspace, 'training_data')
iris_dataset = Dataset.Tabular.from_delimited_files(DataPath(datastore, 'iris.csv'))
cats_dogs_dataset = Dataset.File.from_files(
paths='https://download.microsoft.com/download/3/E/1/3E1C3F21-ECDB-4869-8368-6DEBA77B919F/kagglecatsanddogs_3367a.zip',
archive_options=ArchiveOptions(archive_type=ArchiveType.ZIP, entry_glob='**/*.jpg')
)
若要详细了解如何使用各种选项以及基于各种源创建数据集、在 Azure 机器学习 UI 中注册数据集并查看它们、了解数据大小如何与计算容量交互,以及如何对数据集进行版本控制,请参阅创建 Azure 机器学习数据集。For more options on creating datasets with different options and from different sources, registering them and reviewing them in the Azure Machine Learning UI, understanding how data size interacts with compute capacity, and versioning them, see Create Azure Machine Learning datasets.
将数据集传递给脚本Pass datasets to your script
若要将数据集的路径传递给你的脚本,请使用 Dataset
对象的 as_named_input()
方法。To pass the dataset's path to your script, use the Dataset
object's as_named_input()
method. 你可以将生成的 DatasetConsumptionConfig
对象作为参数传递给你的脚本,也可以通过使用管道脚本的 inputs
参数来使用 Run.get_context().input_datasets[]
检索数据集。You can either pass the resulting DatasetConsumptionConfig
object to your script as an argument or, by using the inputs
argument to your pipeline script, you can retrieve the dataset using Run.get_context().input_datasets[]
.
创建命名输入后,可以选择其访问模式:as_mount()
或 as_download()
。Once you've created a named input, you can choose its access mode: as_mount()
or as_download()
. 如果你的脚本处理数据集中的所有文件,而计算资源上的磁盘足以容纳数据集,则下载访问模式是更好的选择。If your script processes all the files in your dataset and the disk on your compute resource is large enough for the dataset, the download access mode is the better choice. 下载访问模式可避免在运行时流式传输数据所产生的开销。The download access mode will avoid the overhead of streaming the data at runtime. 如果你的脚本访问数据集的子集或它对于你的计算而言太大,请使用装载访问模式。If your script accesses a subset of the dataset or it's too large for your compute, use the mount access mode. 有关详细信息,请阅读装载与下载For more information, read Mount vs. Download
若要将数据集传递给管道步骤,请执行以下操作:To pass a dataset to your pipeline step:
- 使用
TabularDataset.as_named_input()
或FileDataset.as_named_input()
(末尾没有“s”)创建一个DatasetConsumptionConfig
对象UseTabularDataset.as_named_input()
orFileDataset.as_named_input()
(no 's' at end) to create aDatasetConsumptionConfig
object - 使用
as_mount()
或as_download()
设置访问模式Useas_mount()
oras_download()
to set the access mode - 使用
arguments
或inputs
参数将数据集传递给管道步骤Pass the datasets to your pipeline steps using either thearguments
or theinputs
argument
以下代码片段显示了在 PythonScriptStep
构造函数中组合这些步骤的常见模式:The following snippet shows the common pattern of combining these steps within the PythonScriptStep
constructor:
train_step = PythonScriptStep(
name="train_data",
script_name="train.py",
compute_target=cluster,
inputs=[iris_dataset.as_named_input('iris').as_mount()]
)
备注
你需要将所有这些参数(即,"train_data"
、"train.py"
、cluster
和 iris_dataset
)的值替换为自己的数据。You would need to replace the values for all these arguments (that is, "train_data"
, "train.py"
, cluster
, and iris_dataset
) with your own data. 上面的代码片段仅演示了调用的形式,不是 Microsoft 示例的一部分。The above snippet just shows the form of the call and is not part of a Microsoft sample.
你还可以使用 random_split()
和 take_sample()
等方法来创建多个输入或减少传递给管道步骤的数据量:You can also use methods such as random_split()
and take_sample()
to create multiple inputs or reduce the amount of data passed to your pipeline step:
seed = 42 # PRNG seed
smaller_dataset = iris_dataset.take_sample(0.1, seed=seed) # 10%
train, test = smaller_dataset.random_split(percentage=0.8, seed=seed)
train_step = PythonScriptStep(
name="train_data",
script_name="train.py",
compute_target=cluster,
inputs=[train.as_named_input('train').as_download(), test.as_named_input('test').as_download()]
)
在脚本中访问数据集Access datasets within your script
管道步骤脚本的命名输入可以在 Run
对象中用作字典。Named inputs to your pipeline step script are available as a dictionary within the Run
object. 使用 Run.get_context()
检索活动 Run
对象,然后使用 input_datasets
检索命名输入的字典。Retrieve the active Run
object using Run.get_context()
and then retrieve the dictionary of named inputs using input_datasets
. 如果你使用 arguments
参数而非 inputs
参数传递 DatasetConsumptionConfig
对象,请使用 ArgParser
代码访问数据。If you passed the DatasetConsumptionConfig
object using the arguments
argument rather than the inputs
argument, access the data using ArgParser
code. 以下代码片段演示了这两种技术。Both techniques are demonstrated in the following snippet.
# In pipeline definition script:
# Code for demonstration only: It would be very confusing to split datasets between `arguments` and `inputs`
train_step = PythonScriptStep(
name="train_data",
script_name="train.py",
compute_target=cluster,
arguments=['--training-folder', train.as_named_input('train').as_download()]
inputs=[test.as_named_input('test').as_download()]
)
# In pipeline script
parser = argparse.ArgumentParser()
parser.add_argument('--training-folder', type=str, dest='train_folder', help='training data folder mounting point')
args = parser.parse_args()
training_data_folder = args.train_folder
testing_data_folder = Run.get_context().input_datasets['test']
传递的值将是数据集文件的路径。The passed value will be the path to the dataset file(s).
还可以直接访问已注册的 Dataset
。It's also possible to access a registered Dataset
directly. 由于已注册的数据集是持久性的,并且在整个工作区中共享,因此你可以直接检索它们:Since registered datasets are persistent and shared across a workspace, you can retrieve them directly:
run = Run.get_context()
ws = run.experiment.workspace
ds = Dataset.get_by_name(workspace=ws, name='mnist_opendataset')
备注
前面的代码片段演示了调用的形式,不是 Microsoft 示例的一部分。The preceding snippets show the form of the calls and are not part of a Microsoft sample. 你必须将各种参数替换为自己项目中的值。You must replace the various arguments with values from your own project.
将 PipelineData
用于中间数据Use PipelineData
for intermediate data
尽管 Dataset
对象表示持久性数据,但 PipelineData 对象用于从管道步骤输出的临时数据。While Dataset
objects represent persistent data, PipelineData objects are used for temporary data that is output from pipeline steps. 由于 PipelineData
对象的寿命比单个管道步骤长,因此你在管道定义脚本中定义它们。Because the lifespan of a PipelineData
object is longer than a single pipeline step, you define them in the pipeline definition script. 创建 PipelineData
对象时,必须提供一个名称和用于存储数据的数据存储。When you create a PipelineData
object, you must provide a name and a datastore at which the data will reside. 同时使用 arguments
和 outputs
参数将 PipelineData
对象传递给 PythonScriptStep
:Pass your PipelineData
object(s) to your PythonScriptStep
using both the arguments
and the outputs
arguments:
default_datastore = workspace.get_default_datastore()
dataprep_output = PipelineData("clean_data", datastore=default_datastore)
dataprep_step = PythonScriptStep(
name="prep_data",
script_name="dataprep.py",
compute_target=cluster,
arguments=["--output-path", dataprep_output]
inputs=[Dataset.get_by_name(workspace, 'raw_data')],
outputs=[dataprep_output]
)
你可以选择使用一个提供即时上传功能的访问模式来创建 PipelineData
对象。You may choose to create your PipelineData
object using an access mode that provides an immediate upload. 在这种情况下,当你创建 PipelineData
时,请将 upload_mode
设置为 "upload"
,并使用 output_path_on_compute
参数指定要将数据写入到其中的路径:In that case, when you create your PipelineData
, set the upload_mode
to "upload"
and use the output_path_on_compute
argument to specify the path to which you'll be writing the data:
PipelineData("clean_data", datastore=def_blob_store, output_mode="upload", output_path_on_compute="clean_data_output/")
备注
前面的代码片段演示了调用的形式,不是 Microsoft 示例的一部分。The preceding snippets show the form of the calls and are not part of a Microsoft sample. 你必须将各种参数替换为自己项目中的值。You must replace the various arguments with values from your own project.
提示
公共预览版类 OutputFileDatasetConfig
改进了在管道步骤之间传递中间数据的体验。An improved experience for passing intermediate data between pipeline steps is available with the public preview class, OutputFileDatasetConfig
. 有关使用 OutputFileDatasetConfig
的代码示例,请参阅如何生成两步 ML 管道。For a code example using OutputFileDatasetConfig
, see how to build a two step ML pipeline.
将 PipelineData
用作训练步骤的输出Use PipelineData
as outputs of a training step
在管道的 PythonScriptStep
中,可以使用程序的参数检索可用输出路径。Within your pipeline's PythonScriptStep
, you can retrieve the available output paths using the program's arguments. 如果此步骤是第一个步骤并将初始化输出数据,则必须在指定的路径创建目录。If this step is the first and will initialize the output data, you must create the directory at the specified path. 然后,你可以写入要包含在 PipelineData
中的任何文件。You can then write whatever files you wish to be contained in the PipelineData
.
parser = argparse.ArgumentParser()
parser.add_argument('--output_path', dest='output_path', required=True)
args = parser.parse_args()
# Make directory for file
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
f.write("Step 1's output")
如果你已在创建 PipelineData
时将 is_directory
参数设置为 True
,则只需执行 os.makedirs()
调用即可,然后就可以随意将所需的任何文件写入该路径。If you created your PipelineData
with the is_directory
argument set to True
, it would be enough to just perform the os.makedirs()
call and then you would be free to write whatever files you wished to the path. 有关更多详细信息,请查看 PipelineData 参考文档。For more details, see the PipelineData reference documentation.
读取 PipelineData
作为非初始步骤的输入Read PipelineData
as inputs to non-initial steps
当初始管道步骤将一些数据写入到 PipelineData
路径并且该数据成为初始步骤的输出后,可将其用作后面步骤的输入:After the initial pipeline step writes some data to the PipelineData
path and it becomes an output of that initial step, it can be used as an input to a later step:
step1_output_data = PipelineData("processed_data", datastore=def_blob_store, output_mode="upload")
# get adls gen 2 datastore already registered with the workspace
datastore = workspace.datastores['my_adlsgen2']
step1 = PythonScriptStep(
name="generate_data",
script_name="step1.py",
runconfig = aml_run_config,
arguments = ["--output_path", step1_output_data],
inputs=[],
outputs=[step1_output_data]
)
step2 = PythonScriptStep(
name="read_pipeline_data",
script_name="step2.py",
compute_target=compute,
runconfig = aml_run_config,
arguments = ["--pd", step1_output_data],
inputs=[step1_output_data]
)
pipeline = Pipeline(workspace=ws, steps=[step1, step2])
PipelineData
输入的值是上一输出的路径。The value of a PipelineData
input is the path to the previous output.
备注
前面的代码片段演示了调用的形式,不是 Microsoft 示例的一部分。The preceding snippets show the form of the calls and are not part of a Microsoft sample. 你必须将各种参数替换为自己项目中的值。You must replace the various arguments with values from your own project.
提示
公共预览版类 OutputFileDatasetConfig
改进了在管道步骤之间传递中间数据的体验。An improved experience for passing intermediate data between pipeline steps is available with the public preview class, OutputFileDatasetConfig
. 有关使用 OutputFileDatasetConfig
的代码示例,请参阅如何生成两步 ML 管道。For a code example using OutputFileDatasetConfig
, see how to build a two step ML pipeline.
如前所示,如果第一个步骤写入了单个文件,则可以如下所示使用它:If, as shown previously, the first step wrote a single file, consuming it might look like:
parser = argparse.ArgumentParser()
parser.add_argument('--pd', dest='pd', required=True)
args = parser.parse_args()
with open(args.pd) as f:
print(f.read())
将 PipelineData
对象转换为 Dataset
Convert PipelineData
objects to Dataset
s
如果要使 PipelineData
的可用时间超过某个运行的持续时间,请使用其 as_dataset()
函数将其转换为 Dataset
。If you'd like to make your PipelineData
available for longer than the duration of a run, use its as_dataset()
function to convert it to a Dataset
. 然后,你可以注册 Dataset
,使其在你的工作区中处于第一等级。You may then register the Dataset
, making it a first-class citizen in your workspace. 由于每次运行管道时,PipelineData
对象的路径都不同,因此强烈建议你在注册基于 PipelineData
对象创建的 Dataset
时将 create_new_version
设置为 True
。Since your PipelineData
object will have a different path every time the pipeline runs, it's highly recommended that you set create_new_version
to True
when registering a Dataset
created from a PipelineData
object.
step1_output_ds = step1_output_data.as_dataset()
step1_output_ds.register(name="processed_data", create_new_version=True)
提示
公共预览版类 OutputFileDatasetConfig
改进了在管道运行之外持久保存中间数据的体验。An improved experience for persisting your intermediate data outside of your pipeline runs is available with the public preview class, OutputFileDatasetConfig
. 有关使用 OutputFileDatasetConfig
的代码示例,请参阅如何生成两步 ML 管道。For a code example using OutputFileDatasetConfig
, see how to build a two step ML pipeline.