Tutorial 3: Enable recurrent materialization and run batch inference

This tutorial series shows how features seamlessly integrate all phases of the machine learning lifecycle: prototyping, training, and operationalization.

The first tutorial showed how to create a feature set specification with custom transformations. It then showed how to use that feature set to generate training data, enable materialization, and perform a backfill. The second tutorial showed how to enable materialization and perform a backfill. It also showed how to experiment with features, as a way to improve model performance.

This tutorial explains how to:

  • Enable recurrent materialization for the transactions feature set.
  • Run a batch inference pipeline on the registered model.

Prerequisites

Before you proceed with this tutorial, be sure to complete the first and second tutorials in the series.

Set up

  1. Configure the Azure Machine Learning Spark notebook.

    To run this tutorial, you can create a new notebook and execute the instructions, step by step. You can also open and run the existing notebook named 3. Enable recurrent materialization and run batch inference. You can find that notebook, and all the notebooks in this series, in the featurestore_sample/notebooks directory. You can choose sdk_only or sdk_and_cli. Keep this tutorial open and refer to it for documentation links and more explanation.

    1. In the Compute dropdown list in the top nav, select Serverless Spark Compute under Azure Machine Learning Serverless Spark.

    2. Configure the session:

      1. Select Configure session in the top status bar.
      2. Select the Python packages tab.
      3. Select Upload conda file.
      4. Select the azureml-examples/sdk/python/featurestore-sample/project/env/online.yml file from your local machine.
      5. Optionally, increase the session time-out (idle time) to avoid frequent prerequisite reruns.
  2. Start the Spark session.

    # run this cell to start the spark session (any code block will start the session ). This can take around 10 mins.
    print("start spark session")
    
  3. Set up the root directory for the samples.

    import os
    
    # please update the dir to ./Users/{your-alias} (or any custom directory you uploaded the samples to).
    # You can find the name from the directory structure inm the left nav
    root_dir = "./Users/<your user alias>/featurestore_sample"
    
    if os.path.isdir(root_dir):
        print("The folder exists.")
    else:
        print("The folder does not exist. Please create or fix the path")
    
  4. Set up the CLI.

    Not applicable.

  5. Initialize the project workspace CRUD (create, read, update, and delete) client.

    The tutorial notebook runs from this current workspace.

    ### Initialize the MLClient of this project workspace
    import os
    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"]
    
    # connect to the project workspace
    ws_client = MLClient(
        AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name
    )
    
  6. Initialize the feature store variables.

    To reflect what you created in the first tutorial, be sure to update the featurestore_name value.

    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    # feature store
    featurestore_name = "my-featurestore"  # use the same name from part #1 of the tutorial
    featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    
    # feature store ml client
    fs_client = MLClient(
        AzureMLOnBehalfOfCredential(),
        featurestore_subscription_id,
        featurestore_resource_group_name,
        featurestore_name,
    )
    
  7. Initialize the feature store SDK client.

# feature store client
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

Enable recurrent materialization on the transactions feature set

In the second tutorial, you enabled materialization and performed backfill on the transactions feature set. Backfill is an on-demand, one-time operation that computes and places feature values in the materialization store.

To handle inference of the model in production, you might want to set up recurrent materialization jobs to keep the materialization store up to date. These jobs run on user-defined schedules. The recurrent job schedule works this way:

  • Interval and frequency values define a window. For example, the following values define a three-hour window:

    • interval = 3
    • frequency = Hour
  • The first window starts at the start_time value defined in RecurrenceTrigger, and so on.

  • The first recurrent job is submitted at the start of the next window after the update time.

  • Later recurrent jobs are submitted at every window after the first job.

As explained in earlier tutorials, after data is materialized (backfill or recurrent materialization), feature retrieval uses the materialized data by default.

from datetime import datetime
from azure.ai.ml.entities import RecurrenceTrigger

transactions_fset_config = fs_client.feature_sets.get(name="transactions", version="1")

# create a schedule that runs the materialization job every 3 hours
transactions_fset_config.materialization_settings.schedule = RecurrenceTrigger(
    interval=3, frequency="Hour", start_time=datetime(2023, 4, 15, 0, 4, 10, 0)
)

fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)

print(fs_poller.result())

(Optional) Save the YAML file for the feature set asset

You use the updated settings to save the YAML file.

## uncomment and run
# transactions_fset_config.dump(root_dir + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml")

Run the batch inference pipeline

The batch inference has these steps:

  1. You use the same built-in feature retrieval component for feature retrieval that you used in the training pipeline (covered in the third tutorial). For pipeline training, you provided a feature retrieval specification as a component input. For batch inference, you pass the registered model as the input. The component looks for the feature retrieval specification in the model artifact.

    Additionally, for training, the observation data had the target variable. However, the batch inference observation data doesn't have the target variable. The feature retrieval step joins the observation data with the features and outputs the data for batch inference.

  2. The pipeline uses the batch inference input data from previous step, runs inference on the model, and appends the predicted value as output.

    Note

    You use a job for batch inference in this example. You can also use batch endpoints in Azure Machine Learning.

    from azure.ai.ml import load_job  # will be used later
    
    # set the batch inference  pipeline path
    batch_inference_pipeline_path = (
        root_dir + "/project/fraud_model/pipelines/batch_inference_pipeline.yaml"
    )
    batch_inference_pipeline_definition = load_job(source=batch_inference_pipeline_path)
    
    # run the training pipeline
    batch_inference_pipeline_job = ws_client.jobs.create_or_update(
        batch_inference_pipeline_definition
    )
    
    # stream the run logs
    ws_client.jobs.stream(batch_inference_pipeline_job.name)
    

Inspect the output data for batch inference

In the pipeline view:

  1. Select inference_step in the outputs card.

  2. Copy the Data field value. It looks something like azureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1.

  3. Paste the Data field value in the following cell, with separate name and version values. The last character is the version, preceded by a colon (:).

  4. Note the predict_is_fraud column that the batch inference pipeline generated.

    In the batch inference pipeline (/project/fraud_mode/pipelines/batch_inference_pipeline.yaml) outputs, the system created an untracked data asset with a GUID as the name value and 1 as the version value. This happened because you didn't provide name or version values for outputs of inference_step. In this cell, you derive and then display the data path from the asset.

    inf_data_output = ws_client.data.get(
        name="azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction",
        version="1",
    )
    inf_output_df = spark.read.parquet(inf_data_output.path)
    display(inf_output_df.head(5))
    

Clean up

The fifth tutorial in the series describes how to delete the resources.

Next steps