教程 3:启用循环具体化并运行批量推理
本系列教程介绍了特征如何无缝集成机器学习生命周期的所有阶段:原型制作、训练和操作化。
第一个教程介绍了如何使用自定义转换创建特征集规范。 然后,它介绍了如何使用该特征集生成训练数据、启用具体化和执行回填。 教程 2 介绍了如何实现具体化和执行回填。 还演示了如何试验特征以提高模型性能。
本教程介绍了如何完成以下操作:
- 为
transactions
特征集启用循环具体化。 - 在已注册的模型上运行批量推理管道。
先决条件
在继续学习此教程之前,请务必完成系列中的第一个和第二个教程。
设置
配置 Azure 机器学习 Spark 笔记本。
要运行本教程,可以创建一个新笔记本,并按说明分步执行。 还可以打开并运行名为 3.启用循环具体化并运行批量推理的现有笔记本。 可以在 featurestore_sample/notebooks 目录中找到该笔记本以及该系列中的所有笔记本。 可以选择 sdk_only 或 sdk_and_cli。 可以将此教程保持在打开状态,并参考它来获取文档链接和更多说明。
在顶部导航栏的“计算”下拉列表中,选择“Azure 机器学习无服务器 Spark”下的“无服务器 Spark 计算”。
配置会话:
- 在顶部状态栏中选择“配置会话”。
- 选择“Python 包”选项卡。
- 选择“上传 conda 文件”。
- 从本地计算机中选择
azureml-examples/sdk/python/featurestore-sample/project/env/online.yml
文件。 - (可选)增加会话超时(空闲时间)以避免频繁重新运行先决条件。
启动 Spark 会话。
# run this cell to start the spark session (any code block will start the session ). This can take around 10 mins. print("start spark session")
为示例设置根目录。
import os # please update the dir to ./Users/{your-alias} (or any custom directory you uploaded the samples to). # You can find the name from the directory structure inm the left nav root_dir = "./Users/<your user alias>/featurestore_sample" if os.path.isdir(root_dir): print("The folder exists.") else: print("The folder does not exist. Please create or fix the path")
设置 CLI。
不适用。
将项目工作区 CRUD(创建、读取、更新和删除)客户端初始化。
教程笔记本从此当前工作区运行。
### Initialize the MLClient of this project workspace import os from azure.ai.ml import MLClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"] project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"] project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"] # connect to the project workspace ws_client = MLClient( AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name )
初始化特征存储变量。
若要反映第一个教程中创建的内容,请务必更新
featurestore_name
值。from azure.ai.ml import MLClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential # feature store featurestore_name = "my-featurestore" # use the same name from part #1 of the tutorial featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"] featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"] # feature store ml client fs_client = MLClient( AzureMLOnBehalfOfCredential(), featurestore_subscription_id, featurestore_resource_group_name, featurestore_name, )
将功能商店 SDK 客户端初始化。
# feature store client
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
featurestore = FeatureStoreClient(
credential=AzureMLOnBehalfOfCredential(),
subscription_id=featurestore_subscription_id,
resource_group_name=featurestore_resource_group_name,
name=featurestore_name,
)
在事务特征集上启用循环具体化
在第二个教程中,启用了具体化并在 transactions
功能集上执行了回填。 回填是一种按需的一次性操作,用于计算特征值并将其放置在具体化存储中。
要在生产中处理模型的推理,可能需要设置循环的具体化作业,以使具体化存储保持最新。 这些作业按用户定义的计划运行。 重复作业计划的工作方式如下:
Interval 和 frequency 的值定义窗口。 例如,以下值定义了一个三小时的时段:
interval
=3
frequency
=Hour
第一个窗口从
RecurrenceTrigger
中定义的start_time
值开始,依此类推。第一个重复作业在更新时间后的下一个窗口开始时提交。
以后的重复作业会在第一个作业后的每个窗口提交。
如本教程前面部分所述,数据被具体化(回填或循环具体化)后,特征检索默认使用具体化数据。
from datetime import datetime
from azure.ai.ml.entities import RecurrenceTrigger
transactions_fset_config = fs_client.feature_sets.get(name="transactions", version="1")
# create a schedule that runs the materialization job every 3 hours
transactions_fset_config.materialization_settings.schedule = RecurrenceTrigger(
interval=3, frequency="Hour", start_time=datetime(2023, 4, 15, 0, 4, 10, 0)
)
fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)
print(fs_poller.result())
(可选)保存功能集资产的 YAML 文件
你使用更新的设置来保存 YAML 文件。
## uncomment and run
# transactions_fset_config.dump(root_dir + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml")
运行批量推理管道
批量推理包含以下步骤:
你使用与在培训管道中所用相同的内置特征检索组件进行特征检索(第三个教程中有介绍)。 对于管道训练,你提供了一个特征检索规范作为组件输入。 对于批量推理,将已注册的模型作为输入传递。 该组件在模型项目中查找特征检索规范。
此外,出于训练目的,观察数据具有目标变量。 然而,批量推理观察数据不具有目标变量。 特征检索步骤将观察数据与特征连接,并输出用于批量推理的数据。
该管道使用上一步骤中的批量推理输入数据,对模型运行推理,并将预测值附加为输出。
注意
本例中使用作业进行批量推理。 还可以在 Azure 机器学习中使用批处理终结点。
from azure.ai.ml import load_job # will be used later # set the batch inference pipeline path batch_inference_pipeline_path = ( root_dir + "/project/fraud_model/pipelines/batch_inference_pipeline.yaml" ) batch_inference_pipeline_definition = load_job(source=batch_inference_pipeline_path) # run the training pipeline batch_inference_pipeline_job = ws_client.jobs.create_or_update( batch_inference_pipeline_definition ) # stream the run logs ws_client.jobs.stream(batch_inference_pipeline_job.name)
检查批量推理的输出数据
在管道视图中:
在
outputs
卡中选择inference_step
。复制
Data
字段值。 它类似于azureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1
。将
Data
字段值粘贴到以下单元格中,并使用不同的名称和版本值。 最后一个字符是版本,前面带一个冒号 (:
)。请注意,批量推理管道生成了
predict_is_fraud
列。在批量推理管道 (/project/fraud_mode/pipelines/batch_inference_pipeline.yaml) 输出中,系统创建了一个未跟踪的数据资产,其中 GUID 作为名称值,
1
作为版本值。 发生这种情况的原因是,你没有未inference_step
的outputs
提供name
或version
值。 在此单元格中,将从资产中派生数据路径并显示。inf_data_output = ws_client.data.get( name="azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction", version="1", ) inf_output_df = spark.read.parquet(inf_data_output.path) display(inf_output_df.head(5))
清理
本系列的教程五将介绍如何删除资源。
后续步骤
- 了解概念特征存储概念和托管特征存储中的顶级实体。
- 了解托管特征存储的标识和访问控制。
- 查看托管特征存储故障排除指南。
- 查看 YAML 参考。