教程 3:启用循环具体化并运行批量推理

本系列教程介绍了特征如何无缝集成机器学习生命周期的所有阶段:原型制作、训练和操作化。

第一个教程介绍了如何使用自定义转换创建特征集规范。 然后,它介绍了如何使用该特征集生成训练数据、启用具体化和执行回填。 教程 2 介绍了如何实现具体化和执行回填。 还演示了如何试验特征以提高模型性能。

本教程介绍了如何完成以下操作:

  • transactions 特征集启用循环具体化。
  • 在已注册的模型上运行批量推理管道。

先决条件

在继续学习此教程之前,请务必完成系列中的第一个和第二个教程。

设置

  1. 配置 Azure 机器学习 Spark 笔记本。

    要运行本教程,可以创建一个新笔记本,并按说明分步执行。 还可以打开并运行名为 3.启用循环具体化并运行批量推理的现有笔记本。 可以在 featurestore_sample/notebooks 目录中找到该笔记本以及该系列中的所有笔记本。 可以选择 sdk_only 或 sdk_and_cli。 可以将此教程保持在打开状态,并参考它来获取文档链接和更多说明。

    1. 在顶部导航栏的“计算”下拉列表中,选择“Azure 机器学习无服务器 Spark”下的“无服务器 Spark 计算”。

    2. 配置会话:

      1. 在顶部状态栏中选择“配置会话”
      2. 选择“Python 包”选项卡。
      3. 选择“上传 conda 文件”
      4. 从本地计算机中选择 azureml-examples/sdk/python/featurestore-sample/project/env/online.yml 文件。
      5. (可选)增加会话超时(空闲时间)以避免频繁重新运行先决条件。
  2. 启动 Spark 会话。

    # run this cell to start the spark session (any code block will start the session ). This can take around 10 mins.
    print("start spark session")
    
  3. 为示例设置根目录。

    import os
    
    # please update the dir to ./Users/{your-alias} (or any custom directory you uploaded the samples to).
    # You can find the name from the directory structure inm the left nav
    root_dir = "./Users/<your user alias>/featurestore_sample"
    
    if os.path.isdir(root_dir):
        print("The folder exists.")
    else:
        print("The folder does not exist. Please create or fix the path")
    
  4. 设置 CLI。

    不适用。

  5. 将项目工作区 CRUD(创建、读取、更新和删除)客户端初始化。

    教程笔记本从此当前工作区运行。

    ### Initialize the MLClient of this project workspace
    import os
    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"]
    
    # connect to the project workspace
    ws_client = MLClient(
        AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name
    )
    
  6. 初始化特征存储变量。

    若要反映第一个教程中创建的内容,请务必更新 featurestore_name 值。

    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    # feature store
    featurestore_name = "my-featurestore"  # use the same name from part #1 of the tutorial
    featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    
    # feature store ml client
    fs_client = MLClient(
        AzureMLOnBehalfOfCredential(),
        featurestore_subscription_id,
        featurestore_resource_group_name,
        featurestore_name,
    )
    
  7. 将功能商店 SDK 客户端初始化。

# feature store client
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

在事务特征集上启用循环具体化

在第二个教程中,启用了具体化并在 transactions 功能集上执行了回填。 回填是一种按需的一次性操作,用于计算特征值并将其放置在具体化存储中。

要在生产中处理模型的推理,可能需要设置循环的具体化作业,以使具体化存储保持最新。 这些作业按用户定义的计划运行。 重复作业计划的工作方式如下:

  • Interval 和 frequency 的值定义窗口。 例如,以下值定义了一个三小时的时段:

    • interval = 3
    • frequency = Hour
  • 第一个窗口从 RecurrenceTrigger 中定义的 start_time 值开始,依此类推。

  • 第一个重复作业在更新时间后的下一个窗口开始时提交。

  • 以后的重复作业会在第一个作业后的每个窗口提交。

如本教程前面部分所述,数据被具体化(回填或循环具体化)后,特征检索默认使用具体化数据。

from datetime import datetime
from azure.ai.ml.entities import RecurrenceTrigger

transactions_fset_config = fs_client.feature_sets.get(name="transactions", version="1")

# create a schedule that runs the materialization job every 3 hours
transactions_fset_config.materialization_settings.schedule = RecurrenceTrigger(
    interval=3, frequency="Hour", start_time=datetime(2023, 4, 15, 0, 4, 10, 0)
)

fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)

print(fs_poller.result())

(可选)保存功能集资产的 YAML 文件

你使用更新的设置来保存 YAML 文件。

## uncomment and run
# transactions_fset_config.dump(root_dir + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml")

运行批量推理管道

批量推理包含以下步骤:

  1. 你使用与在培训管道中所用相同的内置特征检索组件进行特征检索(第三个教程中有介绍)。 对于管道训练,你提供了一个特征检索规范作为组件输入。 对于批量推理,将已注册的模型作为输入传递。 该组件在模型项目中查找特征检索规范。

    此外,出于训练目的,观察数据具有目标变量。 然而,批量推理观察数据不具有目标变量。 特征检索步骤将观察数据与特征连接,并输出用于批量推理的数据。

  2. 该管道使用上一步骤中的批量推理输入数据,对模型运行推理,并将预测值附加为输出。

    注意

    本例中使用作业进行批量推理。 还可以在 Azure 机器学习中使用批处理终结点。

    from azure.ai.ml import load_job  # will be used later
    
    # set the batch inference  pipeline path
    batch_inference_pipeline_path = (
        root_dir + "/project/fraud_model/pipelines/batch_inference_pipeline.yaml"
    )
    batch_inference_pipeline_definition = load_job(source=batch_inference_pipeline_path)
    
    # run the training pipeline
    batch_inference_pipeline_job = ws_client.jobs.create_or_update(
        batch_inference_pipeline_definition
    )
    
    # stream the run logs
    ws_client.jobs.stream(batch_inference_pipeline_job.name)
    

检查批量推理的输出数据

在管道视图中:

  1. outputs 卡中选择 inference_step

  2. 复制 Data 字段值。 它类似于 azureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1

  3. Data 字段值粘贴到以下单元格中,并使用不同的名称和版本值。 最后一个字符是版本,前面带一个冒号 (:)。

  4. 请注意,批量推理管道生成了 predict_is_fraud 列。

    在批量推理管道 (/project/fraud_mode/pipelines/batch_inference_pipeline.yaml) 输出中,系统创建了一个未跟踪的数据资产,其中 GUID 作为名称值,1 作为版本值。 发生这种情况的原因是,你没有未 inference_stepoutputs 提供 nameversion 值。 在此单元格中,将从资产中派生数据路径并显示。

    inf_data_output = ws_client.data.get(
        name="azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction",
        version="1",
    )
    inf_output_df = spark.read.parquet(inf_data_output.path)
    display(inf_output_df.head(5))
    

清理

本系列的教程五将介绍如何删除资源。

后续步骤