本文内容
重要
此功能目前处于公开预览状态。 此预览版在提供时没有附带服务级别协议,我们不建议将其用于生产工作负荷。 某些功能可能不受支持或者受限。
有关详细信息,请参阅适用于 Azure 预览版的补充使用条款。
使用 Azure 机器学习托管特征存储可以发现、创建和操作特征。 特征在机器学习生命周期中充当连接组织,该生命周期始于原型制作阶段,在此阶段你可以试验各种特征。 该生命周期会持续到操作化阶段,在此阶段你可以部署模型,并且进行可以查找特征数据的推理步骤。 有关特征平台的详细信息,请参阅特征平台概念。
本教程介绍了如何使用域特定语言开发特征集。 托管特征平台的域特定语言 (DSL) 以一种简单、方便使用的方式来定义最常用的功能聚合。 通过特征平台 SDK,用户可以使用 DSL 表达式来执行最常用的聚合。 与用户定义函数 (UDF) 相比,使用 DSL 表达式的聚合能确保结果一致。 此外,这些聚合可以避免编写 UDF 的开销。
本教程介绍如何
- 创建一个全新的最小特征平台工作区
- 使用域特定语言 (DSL) 在本地开发和测试功能
- 使用用户定义函数 (UDF) 开发特征集,该函数能像使用 DSL 创建的特征集一样执行转换
- 比较分别使用 DSL 和 UDF 创建特征集的结果
- 向功能商店注册功能商店实体
- 将使用 DSL 创建的特征集注册到特征平台
- 使用创建的特征生成示例训练数据
备注
本教程将 Azure 机器学习笔记本与无服务器 Spark 计算配合使用。
在继续阅读本教程之前,请确保满足以下先决条件:
- 一个 Azure 机器学习工作区。 如果没有,请参阅快速入门:创建工作区资源了解如何创建。
- 若要执行本文中的步骤,用户帐户必须在待创建特征平台的资源组中持有“所有者”或“参与者”角色。
本教程使用 Python 特征平台的核心 SDK (azureml-featurestore
)。 本 SDK 用于在特征平台、特征集和特征平台实体上创建、读取、更新和删除 (CRUD) 操作。
无需为本教程显式安装这些资源,因为在此处所示的设置说明中,conda.yml
文件涵盖了它们。
若要准备用于开发的笔记本环境,请执行以下操作:
使用以下命令将 示例存储库 - (azureml-examples) 克隆到本地计算机上:
git clone --depth 1 https://github.com/Azure/azureml-examples
还可以从示例存储库 (azureml-examples) 下载 zip 文件。 在此页中,首先选择
code
下拉列表,然后选择Download ZIP
。 然后,将内容解压缩到本地计算机上的文件夹中。将特征平台示例目录上传到项目工作区
- 打开 Azure 机器学习工作区的 Azure 机器学习工作室 UI
- 在左侧面板中选择笔记本
- 在目录列表中选择用户名
- 选择省略号 (...),然后选择“上传文件夹”
- 从克隆的目录路径中选择功能商店示例文件夹:
azureml-examples/sdk/python/featurestore-sample
运行教程
- 选项 1:创建新的笔记本,并逐步执行本文档中的说明
- 选项 2:打开现有笔记本
featurestore_sample/notebooks/sdk_only/7.Develop-feature-set-domain-specific-language-dsl.ipynb
。 可以将此文档保持打开状态,并参阅此文档以获取更多说明和文档链接
若要配置笔记本环境,则必须上传
conda.yml
文件选择左侧面板中的笔记本,然后选择文件标签
导航至
env
目录(选择 Users>your_user_name>featurestore_sample>project>env),然后选择conda.yml
文件选择“下载”
在顶部的导航“计算”下拉列表中选择“无服务器 Spark 计算”。 此操作可能需要一到两分钟。 等待顶部的状态栏显示“配置会话”
在顶部状态栏中选择“配置会话”
选择“设置”
选择 的“Apache Spark 版本”
Spark version 3.3
该操作非必选项,但如果要避免频繁重启无服务器 Spark 会话,请增加 “会话超时”(空闲时间)的数值
在“配置设置”下,定义“属性”
spark.jars.packages
和“值”com.microsoft.azure:azureml-fs-scala-impl:1.0.4
选择“Python 包”
选择“上传 conda 文件”
选择下载到本地设备上的
conda.yml
文件选择“应用”
提示
除此该步骤以外,每次启动新的 Spark 会话或会话超时后,都必须运行其他所有步骤。
此代码单元为示例设置根目录并开始 Spark 会话。 安装所有依赖项并启动 Spark 会话大约需要 10 分钟:
import os # Please update your alias USER_NAME below (or any custom directory you uploaded the samples to). # You can find the name from the directory structure in the left nav root_dir = "./Users/USER_NAME/featurestore_sample" if os.path.isdir(root_dir): print("The folder exists.") else: print("The folder does not exist. Please create or fix the path")
创建最小的特征平台:
从 Azure 机器学习工作室用户界面上或使用 Azure 机器学习 Python SDK 代码在自选区域中创建特征平台。
选项 1:从 Azure 机器学习工作室用户界面上创建特征平台
- 导航至特征平台用户界面的登陆页面
- 选择“+创建”
- “基本信息”选项卡将会出现
- 选择特征平台的“名称”
- 选择“订阅”
- 选择“资源组”
- 选择“区域”
- 选择“Apache Spark 版本”3.3,然后选择“下一步”
- “具体化”选项卡将会出现
- 切换为“启用具体化”
- 选择“订阅” ,“用户身份”,然后“分配用户托管标识”
- 在“脱机存储”下选择“来自 Azure 订阅”
- 选择“存储名称”和“Azure Date Lake Gen2 文件系统名称”,然后选择“下一步”
- 在“查看”选项卡中查看信息,然后选择“创建”
选项 2:使用 Python SDK 创建特征平台,提供
featurestore_name
、featurestore_resource_group_name
和featurestore_subscription_id
的值,并执行此单元来创建一个最小的特征平台:import os featurestore_name = "<FEATURE_STORE_NAME>" featurestore_resource_group_name = "<RESOURCE_GROUP>" featurestore_subscription_id = "<SUBSCRIPTION_ID>" ##### Create Feature Store ##### from azure.ai.ml import MLClient from azure.ai.ml.entities import ( FeatureStore, FeatureStoreEntity, FeatureSet, ) from azure.ai.ml.identity import AzureMLOnBehalfOfCredential ml_client = MLClient( AzureMLOnBehalfOfCredential(), subscription_id=featurestore_subscription_id, resource_group_name=featurestore_resource_group_name, ) featurestore_location = "eastus" fs = FeatureStore(name=featurestore_name, location=featurestore_location) # wait for featurestore creation fs_poller = ml_client.feature_stores.begin_create(fs) print(fs_poller.result())
在脱机存储中为用户标识分配权限:
如果特征数据已具体化,则需要给用户身份分配“存储 Blob 数据读取者”角色,才能从脱机具体化存储中读取特征数据。
- 打开 Azure 机器学习全球登陆页
- 在左侧导航中选择“特征平台”
- 此时会显示可访问的特征平台列表。 选择先前创建的特征平台
- 在“脱机具体化存储”卡选择“帐户名称”下的存储帐户链接,导航到脱机存储的 ADLS Gen2 存储帐户
- 请访问此资源 ,详细了解如何在 ADLS Gen2 的脱机存储帐户为用户身份分配“存储 Blob 数据数据读取者”角色。 等待权限传播。
目前支持以下聚合表达式:
- 平均 -
avg
- 求和 -
sum
- 计数 -
count
- 最小 -
min
- 最大 -
max
下表提供的基准可以比较分别使用 DSL 表达式和 UDF 的聚合之间的性能,使用 23.5 GB 大小的代表性数据集,属性如下:
-
numberOfSourceRows
: 348,244,374 -
numberOfOfflineMaterializedRows
: 227,361,061
函数 | 表达式 | UDF 执行时间 | DSL 执行时间 |
---|---|---|---|
get_offline_features(use_materialized_store=false) |
sum ,avg ,count |
~2 小时 | < 5 分钟 |
get_offline_features(use_materialized_store=true) |
sum ,avg ,count |
~1.5 小时 | < 5 分钟 |
materialize() |
sum ,avg ,count |
~1 小时 | < 15 分钟 |
备注
min
和 max
DSL 表达式相较 UDF 没有任何性能改进。 建议对 min
和 max
转换使用 UDF。
执行此代码单元以创建特征集规范,使用 DSL 表达式和 parquet 文件作为源数据。
from azureml.featurestore import create_feature_set_spec from azureml.featurestore.contracts.feature import Feature from azureml.featurestore.transformation import ( TransformationExpressionCollection, WindowAggregation, ) from azureml.featurestore.contracts import ( DateTimeOffset, TransformationCode, Column, ColumnType, SourceType, TimestampColumn, ) from azureml.featurestore.feature_source import ParquetFeatureSource dsl_feature_set_spec = create_feature_set_spec( source=ParquetFeatureSource( path="wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet", timestamp_column=TimestampColumn(name="timestamp"), source_delay=DateTimeOffset(days=0, hours=0, minutes=20), ), index_columns=[Column(name="accountID", type=ColumnType.string)], features=[ Feature(name="f_transaction_3d_count", type=ColumnType.Integer), Feature(name="f_transaction_amount_3d_sum", type=ColumnType.DOUBLE), Feature(name="f_transaction_amount_3d_avg", type=ColumnType.DOUBLE), Feature(name="f_transaction_7d_count", type=ColumnType.Integer), Feature(name="f_transaction_amount_7d_sum", type=ColumnType.DOUBLE), Feature(name="f_transaction_amount_7d_avg", type=ColumnType.DOUBLE), ], feature_transformation=TransformationExpressionCollection( transformation_expressions=[ WindowAggregation( feature_name="f_transaction_3d_count", aggregation="count", window=DateTimeOffset(days=3), ), WindowAggregation( feature_name="f_transaction_amount_3d_sum", source_column="transactionAmount", aggregation="sum", window=DateTimeOffset(days=3), ), WindowAggregation( feature_name="f_transaction_amount_3d_avg", source_column="transactionAmount", aggregation="avg", window=DateTimeOffset(days=3), ), WindowAggregation( feature_name="f_transaction_7d_count", aggregation="count", window=DateTimeOffset(days=7), ), WindowAggregation( feature_name="f_transaction_amount_7d_sum", source_column="transactionAmount", aggregation="sum", window=DateTimeOffset(days=7), ), WindowAggregation( feature_name="f_transaction_amount_7d_avg", source_column="transactionAmount", aggregation="avg", window=DateTimeOffset(days=7), ), ] ), ) dsl_feature_set_spec
此代码单元规定特征窗口的开始和结束时间。
from datetime import datetime st = datetime(2020, 1, 1) et = datetime(2023, 6, 1)
此代码单元使用
to_spark_dataframe()
从上述通过 DSL 表达式定义的特征集规范中获取规定特征窗口的数据帧:dsl_df = dsl_feature_set_spec.to_spark_dataframe( feature_window_start_date_time=st, feature_window_end_date_time=et )
打印 DSL 表达式定义的特征集中的一些示例特征值:
display(dsl_df)
创建使用 UDF 执行相同转换的特征集规范:
from azureml.featurestore import create_feature_set_spec from azureml.featurestore.contracts import ( DateTimeOffset, TransformationCode, Column, ColumnType, SourceType, TimestampColumn, ) from azureml.featurestore.feature_source import ParquetFeatureSource transactions_featureset_code_path = ( root_dir + "/featurestore/featuresets/transactions/transformation_code" ) udf_feature_set_spec = create_feature_set_spec( source=ParquetFeatureSource( path="wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet", timestamp_column=TimestampColumn(name="timestamp"), source_delay=DateTimeOffset(days=0, hours=0, minutes=20), ), transformation_code=TransformationCode( path=transactions_featureset_code_path, transformer_class="transaction_transform.TransactionFeatureTransformer", ), index_columns=[Column(name="accountID", type=ColumnType.string)], infer_schema=True, ) udf_feature_set_spec
此转换代码显示 UDF 定义的转换与 DSL 表达式的相同:
class TransactionFeatureTransformer(Transformer): def _transform(self, df: DataFrame) -> DataFrame: days = lambda i: i * 86400 w_3d = ( Window.partitionBy("accountID") .orderBy(F.col("timestamp").cast("long")) .rangeBetween(-days(3), 0) ) w_7d = ( Window.partitionBy("accountID") .orderBy(F.col("timestamp").cast("long")) .rangeBetween(-days(7), 0) ) res = ( df.withColumn("transaction_7d_count", F.count("transactionID").over(w_7d)) .withColumn( "transaction_amount_7d_sum", F.sum("transactionAmount").over(w_7d) ) .withColumn( "transaction_amount_7d_avg", F.avg("transactionAmount").over(w_7d) ) .withColumn("transaction_3d_count", F.count("transactionID").over(w_3d)) .withColumn( "transaction_amount_3d_sum", F.sum("transactionAmount").over(w_3d) ) .withColumn( "transaction_amount_3d_avg", F.avg("transactionAmount").over(w_3d) ) .select( "accountID", "timestamp", "transaction_3d_count", "transaction_amount_3d_sum", "transaction_amount_3d_avg", "transaction_7d_count", "transaction_amount_7d_sum", "transaction_amount_7d_avg", ) ) return res
使用
to_spark_dataframe()
从上述 UDF 定义的特征集规范中获取数据帧:udf_df = udf_feature_set_spec.to_spark_dataframe( feature_window_start_date_time=st, feature_window_end_date_time=et )
比较结果并验证 DSL 表达式的结果与通过 UDF 执行的转换之间的一致性。 选择一个
accountID
值以比较两个数据帧中的值,以此进行验证:display(dsl_df.where(dsl_df.accountID == "A1899946977632390").sort("timestamp"))
display(udf_df.where(udf_df.accountID == "A1899946977632390").sort("timestamp"))
若要在特征平台中注册特征集规范,则必须以特定格式保存。 若要检查生成的 transactions-dsl
特征集规范,在文件树中打开此文件以查看规范:featurestore/featuresets/transactions-dsl/spec/FeaturesetSpec.yaml
特征集规范包含以下元素:
-
source
:对存储资源的引用;在本例中是 blob 存储中的 parquet 文件 -
features
:包含特征及其数据类型的列表。 如果提供转换代码,该代码必须返回映射到特征和数据类型的数据帧 -
index_columns
:访问特征集数值所需的联接键
有关更多信息,请参阅顶级特征平台实体文档和特征集规范 YAML 引用资源。
坚持使用特征集规范的另一个好处是它可以由源代码管理。
执行此代码单元以使用 parquet 数据源和 DSL 表达式为特征集编写 YAML 规范文件:
dsl_spec_folder = root_dir + "/featurestore/featuresets/transactions-dsl/spec" dsl_feature_set_spec.dump(dsl_spec_folder, overwrite=True)
执行此代码单元以使用 UDF 为特征集编写 YAML 规范文件:
udf_spec_folder = root_dir + "/featurestore/featuresets/transactions-udf/spec" udf_feature_set_spec.dump(udf_spec_folder, overwrite=True)
本教程中的以下步骤使用两个 SDK。
特征平台 CRUD SDK:Azure 机器学习 (AzureML) SDK
MLClient
(包名称azure-ai-ml
),类似于 Azure 机器学习工作区使用的 SDK。 此 SDK 能够辅助特征平台的 CRUD 操作,- 创建
- 读取
- 更新
- 删除
适合特征平台和特征集实体,因为特征平台是作为 Azure 机器学习工作区类型实现的
特征平台核心 SDK:此 SDK (
azureml-featurestore
) 能够辅助特征集的开发和使用:from azure.ai.ml import MLClient from azure.ai.ml.entities import ( FeatureStore, FeatureStoreEntity, FeatureSet, ) from azure.ai.ml.identity import AzureMLOnBehalfOfCredential from azureml.featurestore import FeatureStoreClient fs_client = MLClient( AzureMLOnBehalfOfCredential(), featurestore_subscription_id, featurestore_resource_group_name, featurestore_name, ) featurestore = FeatureStoreClient( credential=AzureMLOnBehalfOfCredential(), subscription_id=featurestore_subscription_id, resource_group_name=featurestore_resource_group_name, name=featurestore_name, )
创建帐户实体,该实体具有 accountID
类型的联接键 string
:
from azure.ai.ml.entities import DataColumn
account_entity_config = FeatureStoreEntity(
name="account",
version="1",
index_columns=[DataColumn(name="accountID", type="string")],
)
poller = fs_client.feature_store_entities.begin_create_or_update(account_entity_config)
print(poller.result())
将(使用 DSL的)
transactions-dsl
特征集注册到特征平台,并启用脱机具体化以及使用导出的特征集规范:from azure.ai.ml.entities import ( FeatureSet, FeatureSetSpecification, MaterializationSettings, MaterializationComputeResource, ) materialization_settings = MaterializationSettings( offline_enabled=True, resource=MaterializationComputeResource(instance_type="standard_e8s_v3"), spark_configuration={ "spark.driver.cores": 4, "spark.driver.memory": "36g", "spark.executor.cores": 4, "spark.executor.memory": "36g", "spark.executor.instances": 2, }, schedule=None, ) fset_config = FeatureSet( name="transactions-dsl", version="1", entities=["azureml:account:1"], stage="Development", specification=FeatureSetSpecification(path=dsl_spec_folder), materialization_settings=materialization_settings, tags={"data_type": "nonPII"}, ) poller = fs_client.feature_sets.begin_create_or_update(fset_config) print(poller.result())
具体化特征集以将转换的特征数据持续存储到脱机存储中:
poller = fs_client.feature_sets.begin_backfill( name="transactions-dsl", version="1", feature_window_start_time=st, feature_window_end_time=et, spark_configuration={}, data_status=["None", "Incomplete"], ) print(poller.result().job_ids)
执行此代码单元以跟踪具体化作业的进度:
# get the job URL, and stream the job logs (the back fill job could take 10+ minutes to complete) fs_client.jobs.stream(poller.result().job_ids[0])
打印特征集中的示例数据。 输出信息显示数据是从具体化存储中检索的。
get_offline_features()
方法用于检索训练/推理数据,默认情况下也使用具体化存储:# look up the featureset by providing name and version transactions_featureset = featurestore.feature_sets.get("transactions-dsl", "1") display(transactions_featureset.to_spark_dataframe().head(5))
观察数据通常是训练和推理步骤中使用的核心数据。 然后,观察数据与特征数据联接,以创建完整的训练数据资源。 观察数据是在事件发生期间捕获的数据。 这种情况具有核心事务数据,包括事务 ID、帐户 ID 和事务金额值。 由于该数据用于训练,它也追加了目标变量 (is_fraud
)。
首先,浏览观察数据:
observation_data_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/observation_data/train/*.parquet" observation_data_df = spark.read.parquet(observation_data_path) obs_data_timestamp_column = "timestamp" display(observation_data_df) # Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value
选择会成为训练数据一部分的特征,并使用特征平台 SDK 生成训练数据:
featureset = featurestore.feature_sets.get("transactions-dsl", "1") # you can select features in pythonic way features = [ featureset.get_feature("f_transaction_amount_7d_sum"), featureset.get_feature("f_transaction_amount_7d_avg"), ] # you can also specify features in string form: featureset:version:feature more_features = [ "transactions-dsl:1:f_transaction_amount_3d_sum", "transactions-dsl:1:f_transaction_3d_count", ] more_features = featurestore.resolve_feature_uri(more_features) features.extend(more_features)
get_offline_features()
函数使用时间点联接将特征追加到观察数据中。 显示从时间点联接获取的训练数据帧:from azureml.featurestore import get_offline_features training_df = get_offline_features( features=features, observation_data=observation_data_df, timestamp_column=obs_data_timestamp_column, ) display(training_df.sort("transactionID", "accountID", "timestamp"))
将(使用 UDF 的)
transactions-udf
特征集注册到特征平台,并使用导出的特征集规范。 为此特征集启用脱机具体化并在特征平台中注册:fset_config = FeatureSet( name="transactions-udf", version="1", entities=["azureml:account:1"], stage="Development", specification=FeatureSetSpecification(path=udf_spec_folder), materialization_settings=materialization_settings, tags={"data_type": "nonPII"}, ) poller = fs_client.feature_sets.begin_create_or_update(fset_config) print(poller.result())
从要成为训练数据一部分的特征集(使用 DSL 和 UDF 创建)中选择特征,并使用特征平台 SDK 生成训练数据:
featureset_dsl = featurestore.feature_sets.get("transactions-dsl", "1") featureset_udf = featurestore.feature_sets.get("transactions-udf", "1") # you can select features in pythonic way features = [ featureset_dsl.get_feature("f_transaction_amount_7d_sum"), featureset_udf.get_feature("transaction_amount_7d_avg"), ] # you can also specify features in string form: featureset:version:feature more_features = [ "transactions-udf:1:transaction_amount_3d_sum", "transactions-dsl:1:f_transaction_3d_count", ] more_features = featurestore.resolve_feature_uri(more_features) features.extend(more_features)
get_offline_features()
函数使用时间点联接将特征追加到观察数据中。 显示从时间点联接获取的训练数据帧:from azureml.featurestore import get_offline_features training_df = get_offline_features( features=features, observation_data=observation_data_df, timestamp_column=obs_data_timestamp_column, ) display(training_df.sort("transactionID", "accountID", "timestamp"))
特征通过时间点联接追加到训练数据中。 生成的训练数据可用于后续训练和批量推理步骤。
本系列的教程五将介绍如何删除资源。