Tutorial: Build an Azure Machine Learning pipeline for batch scoring
In this advanced tutorial, you learn how to build an Azure Machine Learning pipeline to run a batch scoring job. Machine learning pipelines optimize your workflow with speed, portability, and reuse, so you can focus on machine learning instead of infrastructure and automation. After you build and publish a pipeline, you configure a REST endpoint that you can use to trigger the pipeline from any HTTP library on any platform.
The example uses a pretrained Inception-V3 convolutional neural network model implemented in Tensorflow to classify unlabeled images.
In this tutorial, you complete the following tasks:
- Configure workspace
- Download and store sample data
- Create dataset objects to fetch and output data
- Download, prepare, and register the model in your workspace
- Provision compute targets and create a scoring script
- Use the
ParallelRunStep
class for async batch scoring - Build, run, and publish a pipeline
- Enable a REST endpoint for the pipeline
If you don’t have an Azure subscription, create a trial account before you begin. Try the trial account today.
Prerequisites
- Complete the Quickstart: Get started with Azure Machine Learning if you don't already have an Azure Machine Learning workspace or a compute instance.
- After you complete the quickstart:
- Select Notebooks in the studio.
- Select the Samples tab.
- Open the tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb notebook.
If you want to run the setup tutorial in your own local environment, you can access the tutorial on GitHub. Run pip install azureml-sdk[notebooks] azureml-pipeline-core azureml-pipeline-steps pandas requests
to get the required packages.
Configure workspace and create a datastore
Create a workspace object from the existing Azure Machine Learning workspace.
from azureml.core import Workspace
ws = Workspace.from_config()
Important
This code snippet expects the workspace configuration to be saved in the current directory or its parent. For more information on creating a workspace, see Create and manage Azure Machine Learning workspaces. For more information on saving the configuration to file, see Create a workspace configuration file.
Create a datastore for sample images
On the pipelinedata
account, get the ImageNet evaluation public data sample from the sampledata
public blob container. Call register_azure_blob_container()
to make the data available to the workspace under the name images_datastore
. Then, set the workspace default datastore as the output datastore. Use the output datastore to score output in the pipeline.
For more information on accessing data, see How to access data.
from azureml.core.datastore import Datastore
batchscore_blob = Datastore.register_azure_blob_container(ws,
datastore_name="images_datastore",
container_name="sampledata",
account_name="pipelinedata",
overwrite=True)
def_data_store = ws.get_default_datastore()
Create dataset objects
When building pipelines, Dataset
objects are used for reading data from workspace datastores, and OutputFileDatasetConfig
objects are used for transferring intermediate data between pipeline steps.
Important
The batch scoring example in this tutorial uses only one pipeline step. In use cases that have multiple steps, the typical flow will include these steps:
Use
Dataset
objects as inputs to fetch raw data, perform some transformation, and then output with anOutputFileDatasetConfig
object.Use the
OutputFileDatasetConfig
output object in the preceding step as an input object. Repeat it for subsequent steps.
In this scenario, you create Dataset
objects that correspond to the datastore directories for both the input images and the classification labels (y-test values). You also create an OutputFileDatasetConfig
object for the batch scoring output data.
from azureml.core.dataset import Dataset
from azureml.data import OutputFileDatasetConfig
input_images = Dataset.File.from_files((batchscore_blob, "batchscoring/images/"))
label_ds = Dataset.File.from_files((batchscore_blob, "batchscoring/labels/"))
output_dir = OutputFileDatasetConfig(name="scores")
Register the datasets to the workspace if you want to reuse it later. This step is optional.
input_images = input_images.register(workspace = ws, name = "input_images")
label_ds = label_ds.register(workspace = ws, name = "label_ds")
Download and register the model
Download the pretrained Tensorflow model to use it for batch scoring in a pipeline. First, create a local directory where you store the model. Then, download and extract the model.
import os
import tarfile
import urllib.request
if not os.path.isdir("models"):
os.mkdir("models")
response = urllib.request.urlretrieve("http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz", "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall("models")
Next, register the model to your workspace, so you can easily retrieve the model in the pipeline process. In the register()
static function, the model_name
parameter is the key you use to locate your model throughout the SDK.
from azureml.core.model import Model
model = Model.register(model_path="models/inception_v3.ckpt",
model_name="inception",
tags={"pretrained": "inception"},
description="Imagenet trained tensorflow inception",
workspace=ws)
Create and attach the remote compute target
Machine learning pipelines can't be run locally, so you run them on cloud resources or remote compute targets. A remote compute target is a reusable virtual compute environment where you run experiments and machine learning workflows.
Run the following code to create a GPU-enabled AmlCompute
target, and then attach it to your workspace. For more information about compute targets, see the conceptual article.
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.exceptions import ComputeTargetException
compute_name = "gpu-cluster"
# checks to see if compute target already exists in workspace, else create it
try:
compute_target = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
config = AmlCompute.provisioning_configuration(vm_size="STANDARD_NC6",
vm_priority="lowpriority",
min_nodes=0,
max_nodes=1)
compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
Write a scoring script
To do the scoring, create a batch scoring script called batch_scoring.py
, and then write it to the current directory. The script takes input images, applies the classification model, and then outputs the predictions to a results file.
The batch_scoring.py
script takes the following parameters, which get passed from the ParallelRunStep
you create later:
--model_name
: The name of the model being used.--labels_dir
: The location of thelabels.txt
file.
The pipeline infrastructure uses the ArgumentParser
class to pass parameters into pipeline steps. For example, in the following code, the first argument --model_name
is given the property identifier model_name
. In the init()
function, Model.get_model_path(args.model_name)
is used to access this property.
%%writefile batch_scoring.py
import os
import argparse
import datetime
import time
import tensorflow as tf
from math import ceil
import numpy as np
import shutil
from tensorflow.contrib.slim.python.slim.nets import inception_v3
from azureml.core import Run
from azureml.core.model import Model
from azureml.core.dataset import Dataset
slim = tf.contrib.slim
image_size = 299
num_channel = 3
def get_class_label_dict(labels_dir):
label = []
labels_path = os.path.join(labels_dir, 'labels.txt')
proto_as_ascii_lines = tf.gfile.GFile(labels_path).readlines()
for l in proto_as_ascii_lines:
label.append(l.rstrip())
return label
def init():
global g_tf_sess, probabilities, label_dict, input_images
parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
parser.add_argument('--model_name', dest="model_name", required=True)
parser.add_argument('--labels_dir', dest="labels_dir", required=True)
args, _ = parser.parse_known_args()
label_dict = get_class_label_dict(args.labels_dir)
classes_num = len(label_dict)
with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
input_images = tf.placeholder(tf.float32, [1, image_size, image_size, num_channel])
logits, _ = inception_v3.inception_v3(input_images,
num_classes=classes_num,
is_training=False)
probabilities = tf.argmax(logits, 1)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
g_tf_sess = tf.Session(config=config)
g_tf_sess.run(tf.global_variables_initializer())
g_tf_sess.run(tf.local_variables_initializer())
model_path = Model.get_model_path(args.model_name)
saver = tf.train.Saver()
saver.restore(g_tf_sess, model_path)
def file_to_tensor(file_path):
image_string = tf.read_file(file_path)
image = tf.image.decode_image(image_string, channels=3)
image.set_shape([None, None, None])
image = tf.image.resize_images(image, [image_size, image_size])
image = tf.divide(tf.subtract(image, [0]), [255])
image.set_shape([image_size, image_size, num_channel])
return image
def run(mini_batch):
result_list = []
for file_path in mini_batch:
test_image = file_to_tensor(file_path)
out = g_tf_sess.run(test_image)
result = g_tf_sess.run(probabilities, feed_dict={input_images: [out]})
result_list.append(os.path.basename(file_path) + ": " + label_dict[result[0]])
return result_list
Tip
The pipeline in this tutorial has only one step, and it writes the output to a file. For multi-step pipelines, you also use ArgumentParser
to define a directory to write output data for input to subsequent steps. For an example of passing data between multiple pipeline steps by using the ArgumentParser
design pattern, see the notebook.
Build the pipeline
Before you run the pipeline, create an object that defines the Python environment and creates the dependencies that your batch_scoring.py
script requires. The main dependency required is Tensorflow, but you also install azureml-core
and azureml-dataprep[fuse]
which are required by ParallelRunStep. Also, specify Docker and Docker-GPU support.
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_GPU_IMAGE
cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.15.2",
"azureml-core", "azureml-dataprep[fuse]"])
env = Environment(name="parallelenv")
env.python.conda_dependencies = cd
env.docker.base_image = DEFAULT_GPU_IMAGE
Create the configuration to wrap the script
Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace.
from azureml.pipeline.steps import ParallelRunConfig
parallel_run_config = ParallelRunConfig(
environment=env,
entry_script="batch_scoring.py",
source_directory=".",
output_action="append_row",
mini_batch_size="20",
error_threshold=1,
compute_target=compute_target,
process_count_per_node=2,
node_count=1
)
Create the pipeline step
A pipeline step is an object that encapsulates everything you need to run a pipeline, including:
- Environment and dependency settings
- The compute resource to run the pipeline on
- Input and output data, and any custom parameters
- Reference to a script or SDK logic to run during the step
Multiple classes inherit from the parent class PipelineStep
. You can choose classes to use specific frameworks or stacks to build a step. In this example, you use the ParallelRunStep
class to define your step logic by using a custom Python script. If an argument to your script is either an input to the step or an output of the step, the argument must be defined both in the arguments
array and in either the input
or the output
parameter, respectively.
In scenarios where there is more than one step, an object reference in the outputs
array becomes available as an input for a subsequent pipeline step.
from azureml.pipeline.steps import ParallelRunStep
from datetime import datetime
parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")
label_config = label_ds.as_named_input("labels_input")
batch_score_step = ParallelRunStep(
name=parallel_step_name,
inputs=[input_images.as_named_input("input_images")],
output=output_dir,
arguments=["--model_name", "inception",
"--labels_dir", label_config],
side_inputs=[label_config],
parallel_run_config=parallel_run_config,
allow_reuse=False
)
For a list of all the classes you can use for different step types, see the steps package.
Submit the pipeline
Now, run the pipeline. First, create a Pipeline
object by using your workspace reference and the pipeline step you created. The steps
parameter is an array of steps. In this case, there's only one step for batch scoring. To build pipelines that have multiple steps, place the steps in order in this array.
Next, use the Experiment.submit()
function to submit the pipeline for execution. The wait_for_completion
function outputs logs during the pipeline build process. You can use the logs to see current progress.
Important
The first pipeline run takes roughly 15 minutes. All dependencies must be downloaded, a Docker image is created, and the Python environment is provisioned and created. Running the pipeline again takes significantly less time because those resources are reused instead of created. However, total run time for the pipeline depends on the workload of your scripts and the processes that are running in each pipeline step.
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'Tutorial-Batch-Scoring').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)
Download and review output
Run the following code to download the output file that's created from the batch_scoring.py
script. Then, explore the scoring results.
import pandas as pd
batch_run = next(pipeline_run.get_children())
batch_output = batch_run.get_output_data("scores")
batch_output.download(local_path="inception_results")
for root, dirs, files in os.walk("inception_results"):
for file in files:
if file.endswith("parallel_run_step.txt"):
result_file = os.path.join(root, file)
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
print("Prediction has ", df.shape[0], " rows")
df.head(10)
Publish and run from a REST endpoint
Run the following code to publish the pipeline to your workspace. In your workspace in Azure Machine Learning studio, you can see metadata for the pipeline, including run history and durations. You can also run the pipeline manually from the studio.
Publishing the pipeline enables a REST endpoint that you can use to run the pipeline from any HTTP library on any platform.
published_pipeline = pipeline_run.publish_pipeline(
name="Inception_v3_scoring", description="Batch scoring using Inception v3 model", version="1.0")
published_pipeline
To run the pipeline from the REST endpoint, you need an OAuth2 Bearer-type authentication header. The following example uses interactive authentication (for illustration purposes), but for most production scenarios that require automated or headless authentication, use service principal authentication as described in this article.
Service principal authentication involves creating an App Registration in Azure Active Directory. First, you generate a client secret, and then you grant your service principal role access to your machine learning workspace. Use the ServicePrincipalAuthentication
class to manage your authentication flow.
Both InteractiveLoginAuthentication
and ServicePrincipalAuthentication
inherit from AbstractAuthentication
. In both cases, use the get_authentication_header()
function in the same way to fetch the header:
from azureml.core.authentication import InteractiveLoginAuthentication
interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
Get the REST URL from the endpoint
property of the published pipeline object. You can also find the REST URL in your workspace in Azure Machine Learning studio.
Build an HTTP POST request to the endpoint. Specify your authentication header in the request. Add a JSON payload object that has the experiment name.
Make the request to trigger the run. Include code to access the Id
key from the response dictionary to get the value of the run ID.
import requests
rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint,
headers=auth_header,
json={"ExperimentName": "Tutorial-Batch-Scoring",
"ParameterAssignments": {"process_count_per_node": 6}})
run_id = response.json()["Id"]
Use the run ID to monitor the status of the new run. The new run takes another 10-15 min to finish.
The new run will look similar to the pipeline you ran earlier in the tutorial. You can choose not to view the full output.
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails
published_pipeline_run = PipelineRun(ws.experiments["Tutorial-Batch-Scoring"], run_id)
RunDetails(published_pipeline_run).show()
Clean up resources
Don't complete this section if you plan to run other Azure Machine Learning tutorials.
Stop the compute instance
If you used a compute instance, stop the VM when you aren't using it to reduce cost.
In your workspace, select Compute.
From the list, select the name of the compute instance.
Select Stop.
When you're ready to use the server again, select Start.
Delete everything
If you don't plan to use the resources you created, delete them, so you don't incur any charges:
- In the Azure portal, in the left menu, select Resource groups.
- In the list of resource groups, select the resource group you created.
- Select Delete resource group.
- Enter the resource group name. Then, select Delete.
You can also keep the resource group but delete a single workspace. Display the workspace properties, and then select Delete.
Next steps
In this machine learning pipelines tutorial, you did the following tasks:
- Built a pipeline with environment dependencies to run on a remote GPU compute resource.
- Created a scoring script to run batch predictions by using a pretrained Tensorflow model.
- Published a pipeline and enabled it to be run from a REST endpoint.
For more examples of how to build pipelines by using the machine learning SDK, see the notebook repository.