教程 5:使用自定义源开发特征集
使用 Azure 机器学习托管特征存储可以发现、创建和操作特征。 特征在机器学习生命周期中充当连接组织,该生命周期始于原型制作阶段,在此阶段你可以试验各种特征。 该生命周期会持续到操作化阶段,在此阶段你可以部署模型,并且推理步骤会查找特征数据。 有关特征存储的详细信息,请参阅特征存储概念资源。
本教程系列的第 1 部分介绍了如何使用自定义转换创建特征集规范、启用具体化和执行回填。 第 2 部分演示了如何在试验和训练流中试验特征。 第 3 部分说明了 transactions
特征集的反复具体化,并展示了如何在已注册模型上运行批量推理管道。 第 4 部分介绍了如何运行批量推理。
在本教程中,你将:
- 定义从自定义数据源加载数据的逻辑。
- 配置并注册要从此自定义数据源使用的特征集。
- 测试已注册的特征集。
先决条件
注意
本教程将 Azure 机器学习笔记本与无服务器 Spark 计算配合使用。
- 确保完成本系列中前面的教程。 本教程将重复使用之前教程中创建的特征存储和其他资源。
设置
本教程使用 Python 特征存储核心 SDK (azureml-featurestore
)。 该 Python SDK 用于在特征存储、特征集和特征存储实体上执行创建、读取、更新和删除 (CRUD) 操作。
无需为本教程显式安装这些资源,因为在此处所示的设置说明中,conda.yml
文件涵盖了它们。
配置 Azure 机器学习 Spark 笔记本
可以创建新笔记本,并分步执行本教程中的说明。 还可以打开并运行现有笔记本 featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb。 可以将此教程保持在打开状态,并参考它来获取文档链接和更多说明。
在顶部菜单的“计算”下拉列表中,选择“Azure 机器学习无服务器 Spark”下的“无服务器 Spark 计算”。
配置会话:
- 在顶部状态栏中选择“配置会话”
- 选择“Python 包”选项卡,再选择“上传 conda 文件”
- 选择“上传 Conda 文件”
- 上传在教程一中上传的 conda.yml 文件
- (可选)增加会话超时(空闲时间)以避免频繁重新运行先决条件
为示例设置根目录
此代码单元为示例设置根目录。 安装所有依赖项并启动 Spark 会话大约需要 10 分钟。
import os
# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"
if os.path.isdir(root_dir):
print("The folder exists.")
else:
print("The folder does not exist. Please create or fix the path")
初始化特征存储工作区的 CRUD 客户端
初始化特征存储工作区的 MLClient
,以涵盖特征存储工作区上的创建、读取、更新和删除 (CRUD) 操作。
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
# Feature store
featurestore_name = (
"<FEATURESTORE_NAME>" # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
# Feature store ml client
fs_client = MLClient(
AzureMLOnBehalfOfCredential(),
featurestore_subscription_id,
featurestore_resource_group_name,
featurestore_name,
)
初始化特征存储核心 SDK 客户端
如前所述,本教程使用 Python 特征存储核心 SDK (azureml-featurestore
)。 这一初始化的 SDK 客户端涵盖特征存储、特征集和特征存储实体上的创建、读取、更新和删除 (CRUD) 操作。
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
featurestore = FeatureStoreClient(
credential=AzureMLOnBehalfOfCredential(),
subscription_id=featurestore_subscription_id,
resource_group_name=featurestore_resource_group_name,
name=featurestore_name,
)
自定义源定义
可以从具有自定义源定义的任何数据存储中定义自己的源加载逻辑。 实现源处理器用户定义函数 (UDF) 类(本教程中的 CustomSourceTransformer
),以使用此特征。 此类应定义 __init__(self, **kwargs)
函数和 process(self, start_time, end_time, **kwargs)
函数。 kwargs
字典作为特征集规范定义的一部分提供。 然后,此定义将传递给 UDF。 计算 start_time
和 end_time
参数并将其传递给 UDF 函数。
下面是源处理器 UDF 类的示例代码:
from datetime import datetime
class CustomSourceTransformer:
def __init__(self, **kwargs):
self.path = kwargs.get("source_path")
self.timestamp_column_name = kwargs.get("timestamp_column_name")
if not self.path:
raise Exception("`source_path` is not provided")
if not self.timestamp_column_name:
raise Exception("`timestamp_column_name` is not provided")
def process(
self, start_time: datetime, end_time: datetime, **kwargs
) -> "pyspark.sql.DataFrame":
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(self.path)
if start_time:
df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))
if end_time:
df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))
return df
使用自定义源创建特征集规范,并在本地进行试验
现在,使用自定义源定义创建特征集规范,并在开发环境中使用它来试验特征集。 附加到“无服务器 Spark 计算”的教程笔记本用作开发环境。
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
SourceProcessCode,
TransformationCode,
Column,
ColumnType,
DateTimeOffset,
TimestampColumn,
)
transactions_source_process_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)
udf_featureset_spec = create_feature_set_spec(
source=CustomFeatureSource(
kwargs={
"source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
"timestamp_column_name": "timestamp",
},
timestamp_column=TimestampColumn(name="timestamp"),
source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
source_process_code=SourceProcessCode(
path=transactions_source_process_code_path,
process_class="source_process.CustomSourceTransformer",
),
),
feature_transformation=TransformationCode(
path=transactions_feature_transform_code_path,
transformer_class="transaction_transform.TransactionFeatureTransformer",
),
index_columns=[Column(name="accountID", type=ColumnType.string)],
source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
infer_schema=True,
)
udf_featureset_spec
接下来,定义特征窗口,并在此特征窗口中显示特征值。
from datetime import datetime
st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)
display(
udf_featureset_spec.to_spark_dataframe(
feature_window_start_date_time=st, feature_window_end_date_time=et
)
)
导出为特征集规范
为了向特征存储注册特征集规范,首先以特定格式保存该规范。 查看生成的 transactions_custom_source
特征集规范。 从文件树打开此文件以查看规范:featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml
。
规范包含以下元素:
features
:特征及其数据类型的列表。index_columns
:访问特征集中的值所需的联接键。
若要详细了解规范,请参阅了解托管特征存储中的顶级实体和 CLI (v2) 特征集 YAML 架构资源。
特征集规范持久性具有另一个好处:特征集规范可以进行源代码管理。
feature_spec_folder = (
root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)
udf_featureset_spec.dump(feature_spec_folder)
将事务功能集注册到功能商店
使用此代码向特征存储注册从自定义源加载的特征集资产。 然后,可以重复使用该资产,并轻松共享它。 特征集资产注册提供托管功能,包括版本控制与具体化。
from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification
transaction_fset_config = FeatureSet(
name="transactions_custom_source",
version="1",
description="transactions feature set loaded from custom source",
entities=["azureml:account:1"],
stage="Development",
specification=FeatureSetSpecification(path=feature_spec_folder),
tags={"data_type": "nonPII"},
)
poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())
获取已注册的特征集并打印相关信息。
# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)
从已注册的特征集测试特征生成
使用特征集的 to_spark_dataframe()
函数测试已注册特征集中的特征生成,并显示特征。
print-txn-fset-sample-values
df = transactions_fset_config.to_spark_dataframe()
display(df)
你应该能够成功提取已注册的特征集作为 Spark 数据帧,然后显示它。 现在,可以将这些特征用于与观察数据的时间点联接,以及机器学习管道中的后续步骤。
清理
如果为教程创建了资源组,则可以删除该资源组,这将删除与此教程关联的所有资源。 否则,你可以单独删除资源:
- 若要删除特征存储,请在 Azure 门户中打开资源组,选择特征存储并将其删除。
- 删除特征存储时,不会删除分配给特征存储工作区的用户分配的托管标识 (UAI)。 若要删除 UAI,请按照这些说明进行操作。
- 若要删除存储帐户类型的脱机存储,请在 Azure 门户中打开资源组,选择创建的存储并将其删除。
- 若要删除 Azure Cache for Redis 实例,请在 Azure 门户中打开资源组,选择创建的实例并将其删除。