本文内容
本文将演示如何与同事或客户共享机器学习管道。
机器学习管道是用于机器学习任务的可重用工作流。 管道的一个益处是加强了协作。 你也可以对管道进行版本管理,这样客户就可以在你使用新版本的同时使用当前的模型。
创建用于保留所有管道资源的 Azure 机器学习工作区
配置开发环境以安装 Azure 机器学习 SDK,或使用已经安装了该 SDK 的 Azure 机器学习计算实例
创建并运行机器学习管道,例如,按照教程:生成 Azure 机器学习管道以用于批量评分中的说明操作。 对于其他选项,请参阅使用 Azure 机器学习 SDK 创建并运行机器学习管道
启动并运行管道之后,你可以发布管道,以便它使用其他输入运行。 若要使已发布的管道的 REST 终结点接受参数,必须将管道配置为对各有差异的参数使用 PipelineParameter
对象。
若要创建管道参数,请使用带默认值的 PipelineParameter 对象。
from azureml.pipeline.core.graph import PipelineParameter pipeline_param = PipelineParameter( name="pipeline_arg", default_value=10)
按如下所示,将此
PipelineParameter
对象作为参数添加到管道中的任一步骤:compareStep = PythonScriptStep( script_name="compare.py", arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param], inputs=[ comp_data1, comp_data2], outputs=[out_data3], compute_target=compute_target, source_directory=project_folder)
发布此管道,调用时它会接受参数。
published_pipeline1 = pipeline_run1.publish_pipeline( name="My_Published_Pipeline", description="My Published Pipeline Description", version="1.0")
所有已发布的管道都具有 REST 终结点。 使用管道终结点,可以从任何外部系统(包括非 Python 客户端)触发管道运行。 在批量评分和重新训练方案中,此终结点支持“托管可重复性”。
重要
如果使用 Azure 基于角色的访问控制 (Azure RBAC) 来管理对管道的访问,请设置管道方案的权限(训练或评分)。
若要调用上述管道的运行,需要 Azure Active Directory 身份验证标头令牌。 AzureCliAuthentication 类参考和 Azure 机器学习中的身份验证笔记本中介绍了如何获取这样的令牌。
from azureml.pipeline.core import PublishedPipeline
import requests
response = requests.post(published_pipeline1.endpoint,
headers=aad_token,
json={"ExperimentName": "My_Pipeline",
"ParameterAssignments": {"pipeline_arg": 20}})
对于 ParameterAssignments
键,POST 请求的 json
参数必须包含一个具有管道参数及其值的字典。 此外,json
参数可能包含以下键:
键 | 说明 |
---|---|
ExperimentName |
与此终结点关联的试验的名称 |
Description |
描述终结点的自由格式文本 |
Tags |
可用于标记和注释请求的自由格式键值对 |
DataSetDefinitionValueAssignments |
用于在不重新训练的情况下更改数据集的字典(请参阅以下讨论) |
DataPathAssignments |
用于在不重新训练的情况下更改数据路径的字典(请参阅以下讨论) |
下面的代码演示如何从 C# 异步调用管道。 部分代码段只显示调用结构,而不是 Microsoft 示例的一部分。 它不会显示完整的类或错误处理。
[DataContract]
public class SubmitPipelineRunRequest
{
[DataMember]
public string ExperimentName { get; set; }
[DataMember]
public string Description { get; set; }
[DataMember(IsRequired = false)]
public IDictionary<string, string> ParameterAssignments { get; set; }
}
// ... in its own class and method ...
const string RestEndpoint = "your-pipeline-endpoint";
using (HttpClient client = new HttpClient())
{
var submitPipelineRunRequest = new SubmitPipelineRunRequest()
{
ExperimentName = "YourExperimentName",
Description = "Asynchronous C# REST api call",
ParameterAssignments = new Dictionary<string, string>
{
{
// Replace with your pipeline parameter keys and values
"your-pipeline-parameter", "default-value"
}
}
};
string auth_key = "your-auth-key";
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", auth_key);
// submit the job
var requestPayload = JsonConvert.SerializeObject(submitPipelineRunRequest);
var httpContent = new StringContent(requestPayload, Encoding.UTF8, "application/json");
var submitResponse = await client.PostAsync(RestEndpoint, httpContent).ConfigureAwait(false);
if (!submitResponse.IsSuccessStatusCode)
{
await WriteFailedResponse(submitResponse); // ... method not shown ...
return;
}
var result = await submitResponse.Content.ReadAsStringAsync().ConfigureAwait(false);
var obj = JObject.Parse(result);
// ... use `obj` dictionary to access results
}
下面的代码演示对需要身份验证的管道的调用(请参阅为 Azure 机器学习资源和工作流设置身份验证)。 如果管道是公开部署的,则不需要产生 authKey
的调用。 部分代码片段不显示 Java 类和异常处理样板。 代码使用 Optional.flatMap
将可能返回空 Optional
的函数链接在一起。 使用 flatMap
可以缩短和阐明代码,但请注意,getRequestBody()
会吞并异常。
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Optional;
// JSON library
import com.google.gson.Gson;
String scoringUri = "scoring-endpoint";
String tenantId = "your-tenant-id";
String clientId = "your-client-id";
String clientSecret = "your-client-secret";
String resourceManagerUrl = "https://management.chinacloudapi.cn";
String dataToBeScored = "{ \"ExperimentName\" : \"My_Pipeline\", \"ParameterAssignments\" : { \"pipeline_arg\" : \"20\" }}";
HttpClient client = HttpClient.newBuilder().build();
Gson gson = new Gson();
HttpRequest tokenAuthenticationRequest = tokenAuthenticationRequest(tenantId, clientId, clientSecret, resourceManagerUrl);
Optional<String> authBody = getRequestBody(client, tokenAuthenticationRequest);
Optional<String> authKey = authBody.flatMap(body -> Optional.of(gson.fromJson(body, AuthenticationBody.class).access_token);;
Optional<HttpRequest> scoringRequest = authKey.flatMap(key -> Optional.of(scoringRequest(key, scoringUri, dataToBeScored)));
Optional<String> scoringResult = scoringRequest.flatMap(req -> getRequestBody(client, req));
// ... etc (`scoringResult.orElse()`) ...
static HttpRequest tokenAuthenticationRequest(String tenantId, String clientId, String clientSecret, String resourceManagerUrl)
{
String authUrl = String.format("https://login.chinacloudapi.cn/%s/oauth2/token", tenantId);
String clientIdParam = String.format("client_id=%s", clientId);
String resourceParam = String.format("resource=%s", resourceManagerUrl);
String clientSecretParam = String.format("client_secret=%s", clientSecret);
String bodyString = String.format("grant_type=client_credentials&%s&%s&%s", clientIdParam, resourceParam, clientSecretParam);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(authUrl))
.POST(HttpRequest.BodyPublishers.ofString(bodyString))
.build();
return request;
}
static HttpRequest scoringRequest(String authKey, String scoringUri, String dataToBeScored)
{
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(scoringUri))
.header("Authorization", String.format("Token %s", authKey))
.POST(HttpRequest.BodyPublishers.ofString(dataToBeScored))
.build();
return request;
}
static Optional<String> getRequestBody(HttpClient client, HttpRequest request) {
try {
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
System.out.println(String.format("Unexpected server response %d", response.statusCode()));
return Optional.empty();
}
return Optional.of(response.body());
}catch(Exception x)
{
System.out.println(x.toString());
return Optional.empty();
}
}
class AuthenticationBody {
String access_token;
String token_type;
int expires_in;
String scope;
String refresh_token;
String id_token;
AuthenticationBody() {}
}
你可能想要对其他数据集和数据路径进行训练和推理。 例如,你可能想要对较小的数据集进行训练,对完整数据集进行推理。 可使用请求的 json
参数中的 DataSetDefinitionValueAssignments
键切换数据集。 可使用 DataPathAssignments
切换数据路径。 两者的方法类似:
在管道定义脚本中,为数据集创建
PipelineParameter
。 在PipelineParameter
中创建DatasetConsumptionConfig
或DataPath
:tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.chinacloudapi.cn/demo/Titanic.csv') tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset) tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
在 ML 脚本中,使用
Run.get_context().input_datasets
访问动态指定的数据集:from azureml.core import Run input_tabular_ds = Run.get_context().input_datasets['tabular_dataset'] dataframe = input_tabular_ds.to_pandas_dataframe() # ... etc ...
请注意,ML 脚本访问为
DatasetConsumptionConfig
指定的值 (tabular_dataset
),不访问PipelineParameter
的值 (tabular_ds_param
)。在管道定义脚本中,将
DatasetConsumptionConfig
设置为PipelineScriptStep
的参数:train_step = PythonScriptStep( name="train_step", script_name="train_with_dataset.py", arguments=["--param1", tabular_ds_consumption], inputs=[tabular_ds_consumption], compute_target=compute_target, source_directory=source_directory) pipeline = Pipeline(workspace=ws, steps=[train_step])
若要在推理 REST 调用中动态切换数据集,请使用
DataSetDefinitionValueAssignments
:tabular_ds1 = Dataset.Tabular.from_delimited_files('path_to_training_dataset') tabular_ds2 = Dataset.Tabular.from_delimited_files('path_to_inference_dataset') ds1_id = tabular_ds1.id d22_id = tabular_ds2.id response = requests.post(rest_endpoint, headers=aad_token, json={ "ExperimentName": "MyRestPipeline", "DataSetDefinitionValueAssignments": { "tabular_ds_param": { "SavedDataSetReference": {"Id": ds1_id #or ds2_id }}}})
有关此方法的完整示例,请参阅展示数据集和 PipelineParameter 和展示数据路径和 PipelineParameter 这两个笔记本。
可以创建包含多个已发布管道的管道终结点。 在迭代和更新 ML 管道时,这种方法提供了一个固定的 REST 终结点。
from azureml.pipeline.core import PipelineEndpoint
published_pipeline = PublishedPipeline.get(workspace=ws, name="My_Published_Pipeline")
pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="PipelineEndpointTest",
pipeline=published_pipeline, description="Test description Notebook")
可将作业提交到管道终结点的默认版本:
pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment")
print(run_id)
还可将作业提交到特定的版本:
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment", pipeline_version="0")
print(run_id)
可以使用 REST API 来完成相同的操作:
rest_endpoint = pipeline_endpoint_by_name.endpoint
response = requests.post(rest_endpoint,
headers=aad_token,
json={"ExperimentName": "PipelineEndpointExperiment",
"RunSource": "API",
"ParameterAssignments": {"1": "united", "2":"city"}})
也可以从工作室运行已发布的管道:
登录到 Azure 机器学习工作室。
在左侧选择“终结点”。
在顶部选择“管道终结点”。
选择要运行的特定管道,使用或查看管道终结点的先前运行的结果。
若要在已发布管道的列表中隐藏某个管道,请在工作室或 SDK 中禁用它:
# Get the pipeline by using its ID from Azure Machine Learning studio
p = PublishedPipeline.get(ws, id="068f4885-7088-424b-8ce2-eeb9ba5381a6")
p.disable()
可以使用 p.enable()
再次启用它。 有关详细信息,请参阅 PublishedPipeline 类参考。
- 使用 GitHub 上的这些 Jupyter Notebook 以进一步探索机器学习管道。
- 参阅有关 azureml-pipelines-core 包和 azureml-pipelines-steps 包的 SDK 参考帮助信息。
- 参阅操作指南,获取有关调试管道和排查管道问题的提示。