发布和跟踪机器学习管道Publish and track machine learning pipelines

本文将演示如何与同事或客户共享机器学习管道。This article will show you how to share a machine learning pipeline with your colleagues or customers.

机器学习管道是用于机器学习任务的可重用工作流。Machine learning pipelines are reusable workflows for machine learning tasks. 管道的一个益处是加强了协作。One benefit of pipelines is increased collaboration. 你也可以对管道进行版本管理,这样客户就可以在你使用新版本的同时使用当前的模型。You can also version pipelines, allowing customers to use the current model while you're working on a new version.

必备条件Prerequisites

发布管道Publish a pipeline

启动并运行管道之后,你可以发布管道,以便它使用其他输入运行。Once you have a pipeline up and running, you can publish a pipeline so that it runs with different inputs. 若要使已发布的管道的 REST 终结点接受参数,必须将管道配置为对各有差异的参数使用 PipelineParameter 对象。For the REST endpoint of an already published pipeline to accept parameters, you must configure your pipeline to use PipelineParameter objects for the arguments that will vary.

  1. 若要创建管道参数,请使用带默认值的 PipelineParameter 对象。To create a pipeline parameter, use a PipelineParameter object with a default value.

    from azureml.pipeline.core.graph import PipelineParameter
    
    pipeline_param = PipelineParameter(
      name="pipeline_arg",
      default_value=10)
    
  2. 按如下所示,将此 PipelineParameter 对象作为参数添加到管道中的任一步骤:Add this PipelineParameter object as a parameter to any of the steps in the pipeline as follows:

    compareStep = PythonScriptStep(
      script_name="compare.py",
      arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param],
      inputs=[ comp_data1, comp_data2],
      outputs=[out_data3],
      compute_target=compute_target,
      source_directory=project_folder)
    
  3. 发布此管道,调用时它会接受参数。Publish this pipeline that will accept a parameter when invoked.

    published_pipeline1 = pipeline_run1.publish_pipeline(
         name="My_Published_Pipeline",
         description="My Published Pipeline Description",
         version="1.0")
    

运行已发布的管道Run a published pipeline

所有已发布的管道都具有 REST 终结点。All published pipelines have a REST endpoint. 使用管道终结点,可以从任何外部系统(包括非 Python 客户端)触发管道运行。With the pipeline endpoint, you can trigger a run of the pipeline from any external systems, including non-Python clients. 在批量评分和重新训练方案中,此终结点支持“托管可重复性”。This endpoint enables "managed repeatability" in batch scoring and retraining scenarios.

重要

如果使用基于角色的访问控制 (RBAC) 来管理对管道的访问,请设置管道方案的权限(训练或评分)If you are using role-based access control (RBAC) to manage access to your pipeline, set the permissions for your pipeline scenario (training or scoring).

若要调用上述管道的运行,需要 Azure Active Directory 身份验证标头令牌。To invoke the run of the preceding pipeline, you need an Azure Active Directory authentication header token. AzureCliAuthentication 类参考和 Azure 机器学习中的身份验证笔记本中介绍了如何获取这样的令牌。Getting such a token is described in the AzureCliAuthentication class reference and in the Authentication in Azure Machine Learning notebook.

from azureml.pipeline.core import PublishedPipeline
import requests

response = requests.post(published_pipeline1.endpoint,
                         headers=aad_token,
                         json={"ExperimentName": "My_Pipeline",
                               "ParameterAssignments": {"pipeline_arg": 20}})

对于 ParameterAssignments 键,POST 请求的 json 参数必须包含一个具有管道参数及其值的字典。The json argument to the POST request must contain, for the ParameterAssignments key, a dictionary containing the pipeline parameters and their values. 此外,json 参数可能包含以下键:In addition, the json argument may contain the following keys:

Key 说明Description
ExperimentName 与此终结点关联的试验的名称The name of the experiment associated with this endpoint
Description 描述终结点的自由格式文本Freeform text describing the endpoint
Tags 可用于标记和注释请求的自由格式键值对Freeform key-value pairs that can be used to label and annotate requests
DataSetDefinitionValueAssignments 用于在不重新训练的情况下更改数据集的字典(请参阅以下讨论)Dictionary used for changing datasets without retraining (see discussion below)
DataPathAssignments 用于在不重新训练的情况下更改数据路径的字典(请参阅以下讨论)Dictionary used for changing datapaths without retraining (see discussion below)

使用 C# 运行已发布的管道Run a published pipeline using C#

下面的代码演示如何从 C# 异步调用管道。The following code shows how to call a pipeline asynchronously from C#. 部分代码段只显示调用结构,而不是 Microsoft 示例的一部分。The partial code snippet just shows the call structure and isn't part of a Microsoft sample. 它不会显示完整的类或错误处理。It doesn't show complete classes or error handling.

[DataContract]
public class SubmitPipelineRunRequest
{
    [DataMember]
    public string ExperimentName { get; set; }

    [DataMember]
    public string Description { get; set; }

    [DataMember(IsRequired = false)]
    public IDictionary<string, string> ParameterAssignments { get; set; }
}

// ... in its own class and method ... 
const string RestEndpoint = "your-pipeline-endpoint";

using (HttpClient client = new HttpClient())
{
    var submitPipelineRunRequest = new SubmitPipelineRunRequest()
    {
        ExperimentName = "YourExperimentName", 
        Description = "Asynchronous C# REST api call", 
        ParameterAssignments = new Dictionary<string, string>
        {
            {
                // Replace with your pipeline parameter keys and values
                "your-pipeline-parameter", "default-value"
            }
        }
    };

    string auth_key = "your-auth-key"; 
    client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", auth_key);

    // submit the job
    var requestPayload = JsonConvert.SerializeObject(submitPipelineRunRequest);
    var httpContent = new StringContent(requestPayload, Encoding.UTF8, "application/json");
    var submitResponse = await client.PostAsync(RestEndpoint, httpContent).ConfigureAwait(false);
    if (!submitResponse.IsSuccessStatusCode)
    {
        await WriteFailedResponse(submitResponse); // ... method not shown ...
        return;
    }

    var result = await submitResponse.Content.ReadAsStringAsync().ConfigureAwait(false);
    var obj = JObject.Parse(result);
    // ... use `obj` dictionary to access results
}

使用 Java 运行已发布的管道Run a published pipeline using Java

下面的代码演示对需要身份验证的管道的调用(请参阅为 Azure 机器学习资源和工作流设置身份验证)。The following code shows a call to a pipeline that requires authentication (see Set up authentication for Azure Machine Learning resources and workflows). 如果管道是公开部署的,则不需要产生 authKey 的调用。If your pipeline is deployed publicly, you don't need the calls that produce authKey. 部分代码片段不显示 Java 类和异常处理样板。The partial code snippet doesn't show Java class and exception-handling boilerplate. 代码使用 Optional.flatMap 将可能返回空 Optional 的函数链接在一起。The code uses Optional.flatMap for chaining together functions that may return an empty Optional. 使用 flatMap 可以缩短和阐明代码,但请注意,getRequestBody() 会吞并异常。The use of flatMap shortens and clarifies the code, but note that getRequestBody() swallows exceptions.

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Optional;
// JSON library
import com.google.gson.Gson;

String scoringUri = "scoring-endpoint";
String tenantId = "your-tenant-id";
String clientId = "your-client-id";
String clientSecret = "your-client-secret";
String resourceManagerUrl = "https://management.azure.com";
String dataToBeScored = "{ \"ExperimentName\" : \"My_Pipeline\", \"ParameterAssignments\" : { \"pipeline_arg\" : \"20\" }}";

HttpClient client = HttpClient.newBuilder().build();
Gson gson = new Gson();

HttpRequest tokenAuthenticationRequest = tokenAuthenticationRequest(tenantId, clientId, clientSecret, resourceManagerUrl);
Optional<String> authBody = getRequestBody(client, tokenAuthenticationRequest);
Optional<String> authKey = authBody.flatMap(body -> Optional.of(gson.fromJson(body, AuthenticationBody.class).access_token);;
Optional<HttpRequest> scoringRequest = authKey.flatMap(key -> Optional.of(scoringRequest(key, scoringUri, dataToBeScored)));
Optional<String> scoringResult = scoringRequest.flatMap(req -> getRequestBody(client, req));
// ... etc (`scoringResult.orElse()`) ... 

static HttpRequest tokenAuthenticationRequest(String tenantId, String clientId, String clientSecret, String resourceManagerUrl)
{
    String authUrl = String.format("https://login.microsoftonline.com/%s/oauth2/token", tenantId);
    String clientIdParam = String.format("client_id=%s", clientId);
    String resourceParam = String.format("resource=%s", resourceManagerUrl);
    String clientSecretParam = String.format("client_secret=%s", clientSecret);

    String bodyString = String.format("grant_type=client_credentials&%s&%s&%s", clientIdParam, resourceParam, clientSecretParam);

    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(authUrl))
        .POST(HttpRequest.BodyPublishers.ofString(bodyString))
        .build();
    return request;
}

static HttpRequest scoringRequest(String authKey, String scoringUri, String dataToBeScored)
{
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(scoringUri))
        .header("Authorization", String.format("Token %s", authKey))
        .POST(HttpRequest.BodyPublishers.ofString(dataToBeScored))
        .build();
    return request;

}

static Optional<String> getRequestBody(HttpClient client, HttpRequest request) {
    try {
        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
        if (response.statusCode() != 200) {
            System.out.println(String.format("Unexpected server response %d", response.statusCode()));
            return Optional.empty();
        }
        return Optional.of(response.body());
    }catch(Exception x)
    {
        System.out.println(x.toString());
        return Optional.empty();
    }
}

class AuthenticationBody {
    String access_token;
    String token_type;
    int expires_in;
    String scope;
    String refresh_token;
    String id_token;
    
    AuthenticationBody() {}
}

在不重新训练的情况下更改数据集和数据路径Changing datasets and datapaths without retraining

你可能想要对其他数据集和数据路径进行训练和推理。You may want to train and inference on different datasets and datapaths. 例如,你可能想要对较小的数据集进行训练,对完整数据集进行推理。For instance, you may wish to train on a smaller dataset but inference on the complete dataset. 可使用请求的 json 参数中的 DataSetDefinitionValueAssignments 键切换数据集。You switch datasets with the DataSetDefinitionValueAssignments key in the request's json argument. 可使用 DataPathAssignments 切换数据路径。You switch datapaths with DataPathAssignments. 两者的方法类似:The technique for both is similar:

  1. 在管道定义脚本中,为数据集创建 PipelineParameterIn your pipeline definition script, create a PipelineParameter for the dataset. PipelineParameter 中创建 DatasetConsumptionConfigDataPathCreate a DatasetConsumptionConfig or DataPath from the PipelineParameter:

    tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
    tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
    tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
    
  2. 在 ML 脚本中,使用 Run.get_context().input_datasets 访问动态指定的数据集:In your ML script, access the dynamically specified dataset using Run.get_context().input_datasets:

    from azureml.core import Run
    
    input_tabular_ds = Run.get_context().input_datasets['tabular_dataset']
    dataframe = input_tabular_ds.to_pandas_dataframe()
    # ... etc ...
    

    请注意,ML 脚本访问为 DatasetConsumptionConfig 指定的值 (tabular_dataset),不访问 PipelineParameter 的值 (tabular_ds_param)。Notice that the ML script accesses the value specified for the DatasetConsumptionConfig (tabular_dataset) and not the value of the PipelineParameter (tabular_ds_param).

  3. 在管道定义脚本中,将 DatasetConsumptionConfig 设置为 PipelineScriptStep 的参数:In your pipeline definition script, set the DatasetConsumptionConfig as a parameter to the PipelineScriptStep:

    train_step = PythonScriptStep(
        name="train_step",
        script_name="train_with_dataset.py",
        arguments=["--param1", tabular_ds_consumption],
        inputs=[tabular_ds_consumption],
        compute_target=compute_target,
        source_directory=source_directory)
    
    pipeline = Pipeline(workspace=ws, steps=[train_step])
    
  4. 若要在推理 REST 调用中动态切换数据集,请使用 DataSetDefinitionValueAssignmentsTo switch datasets dynamically in your inferencing REST call, use DataSetDefinitionValueAssignments:

    tabular_ds1 = Dataset.Tabular.from_delimited_files('path_to_training_dataset')
    tabular_ds2 = Dataset.Tabular.from_delimited_files('path_to_inference_dataset')
    ds1_id = tabular_ds1.id
    d22_id = tabular_ds2.id
    
    response = requests.post(rest_endpoint, 
                             headers=aad_token, 
                             json={
                                "ExperimentName": "MyRestPipeline",
                               "DataSetDefinitionValueAssignments": {
                                    "tabular_ds_param": {
                                        "SavedDataSetReference": {"Id": ds1_id #or ds2_id
                                    }}}})
    

有关此方法的完整示例,请参阅展示数据集和 PipelineParameter展示数据路径和 PipelineParameter 这两个笔记本。The notebooks Showcasing Dataset and PipelineParameter and Showcasing DataPath and PipelineParameter have complete examples of this technique.

创建版本受控的管道终结点Create a versioned pipeline endpoint

可以创建包含多个已发布管道的管道终结点。You can create a Pipeline Endpoint with multiple published pipelines behind it. 在迭代和更新 ML 管道时,这种方法提供了一个固定的 REST 终结点。This technique gives you a fixed REST endpoint as you iterate on and update your ML pipelines.

from azureml.pipeline.core import PipelineEndpoint

published_pipeline = PipelineEndpoint.get(workspace=ws, name="My_Published_Pipeline")
pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="PipelineEndpointTest",
                                            pipeline=published_pipeline, description="Test description Notebook")

将作业提交到管道终结点Submit a job to a pipeline endpoint

可将作业提交到管道终结点的默认版本:You can submit a job to the default version of a pipeline endpoint:

pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment")
print(run_id)

还可将作业提交到特定的版本:You can also submit a job to a specific version:

run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment", pipeline_version="0")
print(run_id)

可以使用 REST API 来完成相同的操作:The same can be accomplished using the REST API:

rest_endpoint = pipeline_endpoint_by_name.endpoint
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "PipelineEndpointExperiment",
                               "RunSource": "API",
                               "ParameterAssignments": {"1": "united", "2":"city"}})

在工作室中使用已发布的管道Use published pipelines in the studio

也可以从工作室运行已发布的管道:You can also run a published pipeline from the studio:

  1. 登录到 Azure 机器学习工作室Sign in to Azure Machine Learning studio.

  2. 查看工作区View your workspace.

  3. 在左侧选择“终结点”。On the left, select Endpoints.

  4. 在顶部选择“管道终结点”。On the top, select Pipeline endpoints. 机器学习的已发布管道列表list of machine learning published pipelines

  5. 选择要运行的特定管道,使用或查看管道终结点的先前运行的结果。Select a specific pipeline to run, consume, or review results of previous runs of the pipeline endpoint.

禁用已发布的管道Disable a published pipeline

若要在已发布管道的列表中隐藏某个管道,请在工作室或 SDK 中禁用它:To hide a pipeline from your list of published pipelines, you disable it, either in the studio or from the SDK:

# Get the pipeline by using its ID from Azure Machine Learning studio
p = PublishedPipeline.get(ws, id="068f4885-7088-424b-8ce2-eeb9ba5381a6")
p.disable()

可以使用 p.enable() 再次启用它。You can enable it again with p.enable(). 有关详细信息,请参阅 PublishedPipeline 类参考。For more information, see PublishedPipeline class reference.

后续步骤Next steps