教程 5:使用自定义源开发特征集

使用 Azure 机器学习托管特征存储可以发现、创建和操作特征。 特征在机器学习生命周期中充当连接组织,该生命周期始于原型制作阶段,在此阶段你可以试验各种特征。 该生命周期会持续到操作化阶段,在此阶段你可以部署模型,并且推理步骤会查找特征数据。 有关特征存储的详细信息,请参阅特征存储概念资源。

本教程系列的第 1 部分介绍了如何使用自定义转换创建特征集规范、启用具体化和执行回填。 第 2 部分演示了如何在试验和训练流中试验特征。 第 3 部分说明了 transactions 特征集的反复具体化,并展示了如何在已注册模型上运行批量推理管道。 第 4 部分介绍了如何运行批量推理。

在本教程中,你将:

  • 定义从自定义数据源加载数据的逻辑。
  • 配置并注册要从此自定义数据源使用的特征集。
  • 测试已注册的特征集。

先决条件

注意

本教程将 Azure 机器学习笔记本与无服务器 Spark 计算配合使用

  • 确保完成本系列中前面的教程。 本教程将重复使用之前教程中创建的特征存储和其他资源。

设置

本教程使用 Python 特征存储核心 SDK (azureml-featurestore)。 该 Python SDK 用于在特征存储、特征集和特征存储实体上执行创建、读取、更新和删除 (CRUD) 操作。

无需为本教程显式安装这些资源,因为在此处所示的设置说明中,conda.yml 文件涵盖了它们。

配置 Azure 机器学习 Spark 笔记本

可以创建新笔记本,并分步执行本教程中的说明。 还可以打开并运行现有笔记本 featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb。 可以将此教程保持在打开状态,并参考它来获取文档链接和更多说明。

  1. 在顶部菜单的“计算”下拉列表中,选择“Azure 机器学习无服务器 Spark”下的“无服务器 Spark 计算”。

  2. 配置会话:

    1. 在顶部状态栏中选择“配置会话”
    2. 选择“Python 包”选项卡,再选择“上传 conda 文件”
    3. 选择“上传 Conda 文件”
    4. 上传在教程一中上传的 conda.yml 文件
    5. (可选)增加会话超时(空闲时间)以避免频繁重新运行先决条件

为示例设置根目录

此代码单元为示例设置根目录。 安装所有依赖项并启动 Spark 会话大约需要 10 分钟。

import os

# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

初始化特征存储工作区的 CRUD 客户端

初始化特征存储工作区的 MLClient,以涵盖特征存储工作区上的创建、读取、更新和删除 (CRUD) 操作。

from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store ml client
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

初始化特征存储核心 SDK 客户端

如前所述,本教程使用 Python 特征存储核心 SDK (azureml-featurestore)。 这一初始化的 SDK 客户端涵盖特征存储、特征集和特征存储实体上的创建、读取、更新和删除 (CRUD) 操作。

from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

自定义源定义

可以从具有自定义源定义的任何数据存储中定义自己的源加载逻辑。 实现源处理器用户定义函数 (UDF) 类(本教程中的 CustomSourceTransformer),以使用此特征。 此类应定义 __init__(self, **kwargs) 函数和 process(self, start_time, end_time, **kwargs) 函数。 kwargs 字典作为特征集规范定义的一部分提供。 然后,此定义将传递给 UDF。 计算 start_timeend_time 参数并将其传递给 UDF 函数。

下面是源处理器 UDF 类的示例代码:

from datetime import datetime

class CustomSourceTransformer:
    def __init__(self, **kwargs):
        self.path = kwargs.get("source_path")
        self.timestamp_column_name = kwargs.get("timestamp_column_name")
        if not self.path:
            raise Exception("`source_path` is not provided")
        if not self.timestamp_column_name:
            raise Exception("`timestamp_column_name` is not provided")

    def process(
        self, start_time: datetime, end_time: datetime, **kwargs
    ) -> "pyspark.sql.DataFrame":
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col, lit, to_timestamp

        spark = SparkSession.builder.getOrCreate()
        df = spark.read.json(self.path)

        if start_time:
            df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))

        if end_time:
            df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))

        return df

使用自定义源创建特征集规范,并在本地进行试验

现在,使用自定义源定义创建特征集规范,并在开发环境中使用它来试验特征集。 附加到“无服务器 Spark 计算”的教程笔记本用作开发环境。

from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
    SourceProcessCode,
    TransformationCode,
    Column,
    ColumnType,
    DateTimeOffset,
    TimestampColumn,
)

transactions_source_process_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=CustomFeatureSource(
        kwargs={
            "source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
            "timestamp_column_name": "timestamp",
        },
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        source_process_code=SourceProcessCode(
            path=transactions_source_process_code_path,
            process_class="source_process.CustomSourceTransformer",
        ),
    ),
    feature_transformation=TransformationCode(
        path=transactions_feature_transform_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

udf_featureset_spec

接下来,定义特征窗口,并在此特征窗口中显示特征值。

from datetime import datetime

st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)

display(
    udf_featureset_spec.to_spark_dataframe(
        feature_window_start_date_time=st, feature_window_end_date_time=et
    )
)

导出为特征集规范

为了向特征存储注册特征集规范,首先以特定格式保存该规范。 查看生成的 transactions_custom_source 特征集规范。 从文件树打开此文件以查看规范:featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml

规范包含以下元素:

  • features:特征及其数据类型的列表。
  • index_columns:访问特征集中的值所需的联接键。

若要详细了解规范,请参阅了解托管特征存储中的顶级实体CLI (v2) 特征集 YAML 架构资源。

特征集规范持久性具有另一个好处:特征集规范可以进行源代码管理。

feature_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)

udf_featureset_spec.dump(feature_spec_folder)

将事务功能集注册到功能商店

使用此代码向特征存储注册从自定义源加载的特征集资产。 然后,可以重复使用该资产,并轻松共享它。 特征集资产注册提供托管功能,包括版本控制与具体化。

from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions_custom_source",
    version="1",
    description="transactions feature set loaded from custom source",
    entities=["azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=feature_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

获取已注册的特征集并打印相关信息。

# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
    name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)

从已注册的特征集测试特征生成

使用特征集的 to_spark_dataframe() 函数测试已注册特征集中的特征生成,并显示特征。 print-txn-fset-sample-values

df = transactions_fset_config.to_spark_dataframe()
display(df)

你应该能够成功提取已注册的特征集作为 Spark 数据帧,然后显示它。 现在,可以将这些特征用于与观察数据的时间点联接,以及机器学习管道中的后续步骤。

清理

如果为教程创建了资源组,则可以删除该资源组,这将删除与此教程关联的所有资源。 否则,你可以单独删除资源:

  • 若要删除特征存储,请在 Azure 门户中打开资源组,选择特征存储并将其删除。
  • 删除特征存储时,不会删除分配给特征存储工作区的用户分配的托管标识 (UAI)。 若要删除 UAI,请按照这些说明进行操作。
  • 若要删除存储帐户类型的脱机存储,请在 Azure 门户中打开资源组,选择创建的存储并将其删除。
  • 若要删除 Azure Cache for Redis 实例,请在 Azure 门户中打开资源组,选择创建的实例并将其删除。

后续步骤