本文介绍特征集规范、可用于该特征集的不同类型的转换以及相关的最佳做法。
特征集是由源数据转换生成的特征的集合。 特征集规范是特征集开发和本地测试的自包含定义。 在进行开发和本地测试后,可以在特征存储中将该特征集注册为特征集资产。 然后,你就拥有了可用作托管功能的版本控制和具体化。
定义特征集
FeatureSetSpec 定义了特征集。 此示例显示了特征集规范文件:
$schema: http://azureml/sdk-2-0/FeatureSetSpec.json
source:
type: parquet
path: abfs://file_system@account_name.dfs.core.chinacloudapi.cn/datasources/transactions-source/*.parquet
timestamp_column: # name of the column representing the timestamp.
name: timestamp
source_delay:
days: 0
hours: 3
minutes: 0
feature_transformation:
transformation_code:
path: ./transformation_code
transformer_class: transaction_transform.TransactionFeatureTransformer
features:
- name: transaction_7d_count
type: long
- name: transaction_amount_7d_sum
type: double
- name: transaction_amount_7d_avg
type: double
- name: transaction_3d_count
type: long
- name: transaction_amount_3d_sum
type: double
- name: transaction_amount_3d_avg
type: double
index_columns:
- name: accountID
type: string
source_lookback:
days: 7
hours: 0
minutes: 0
temporal_join_lookback:
days: 1
hours: 0
minutes: 0
注意
featurestore 核心 SDK 可自动生成特征集规范 YAML。 本教程 提供了一个示例。
在 FeatureSetSpec 定义中,以下属性与特征转换相关:
source:定义了源数据和相关元数据,例如数据中的时间戳列。 目前仅支持时序源数据和特征。source.timestamp_column属性是必需的feature_transformation.transformation_code:定义了特征转换器的代码文件夹位置features:定义了特征转换器生成的特征架构index_columns:定义了特征转换器生成的索引列架构source_lookback:当特征处理时序(例如窗口聚合)数据的聚合时,将使用此属性。 此属性的值指示过去源数据所需的时间范围(适用于时间 T 时的特征值)。有关详细信息,请参阅“最佳做法”部分。
如何计算特征?
定义 FeatureSetSpec 后,请调用 featureSetSpec.to_spark_dataframe(feature_window_start_ts, feature_window_end_ts) 来计算给定特征窗口中的特征。
计算在以下步骤中发生:
- 从源数据读取数据。
source定义了源数据。 按时间范围[feature_window_start_ts - source_lookback, feature_window_end_ts)筛选数据。 时间范围包括窗口的开始时间,但不包括窗口的结束时间 - 对数据应用特征转换器(由
feature_transformation.transformation_code定义)并获取计算特征 - 筛选特征值以仅返回特征窗口
[feature_window_start_ts, feature_window_end_ts)中的特征记录
在此代码示例中,特征存储 API 会计算特征:
# define the source data time window according to feature window
source_window_start_ts = feature_window_start_ts - source_lookback
source_window_end_ts = feature_window_end_ts
# read source table into a dataframe
df1 = spark.read.parquet(source.path).filter(df1["timestamp"] >= source_window_start_ts && df1["timestamp"] < source_window_end_ts)
# apply the feature transformer
df2 = FeatureTransformer._transform(df1)
## filter the feature(set) to include only feature records within the feature window
feature_set_df = df2.filter(df2["timestamp"] >= feature_window_start_ts && df2["timestamp"] < feature_window_end_ts)
特征转换器函数的输出架构
转换函数会输出一个数据帧,它在其架构中包含以下值:
- 与
FeatureSetSpec定义在名称和类型方面均匹配的索引列 - 与
source中时间戳定义匹配的时间戳列(名称)。source位于FeatureSetSpec中 - 将所有其他列名/类型值定义为
FeatureSetSpec中的features
实现适用于常见转换类型的特征转换器
行级转换
在行级别转换中,特定行上的特征值计算仅使用该行的列值。 从此源数据开始:
user_id |
timestamp |
total_spend |
|---|---|---|
| 1 | 2022-12-19 06:00:00 | 12.00 |
| 2 | 2022-12-10 03:00:00 | 56.00 |
| 1 | 2022-12-25 13:00:00 | 112.00 |
定义名为 user_total_spend_profile 的新特征集:
from pyspark.sql import Dataframe
from pyspark.ml import Transformer
class UserTotalSpendProfileTransformer(Transformer):
def _transform(df: Dataframe) -> Dataframe:
df.withColumn("is_high_spend_user", col("total_spend") > 100.0) \
.withColumn("is_low_spend_user", col("total_spend") < 20.0)
此特征集具有三个特征,其数据类型如下所示:
total_spend:双精度is_high_spend_user:布尔is_low_spend_user:布尔
这显示了计算的特征值:
user_id |
timestamp |
total_spend |
is_high_spend_user |
is_low_spend_user |
|---|---|---|---|---|
| 1 | 2022-12-19 06:00:00 | 12.00 | false | true |
| 2 | 2022-12-10 03:00:00 | 56.00 | false | false |
| 1 | 2022-12-25 13:00:00 | 112.00 | true | false |
滑动窗口聚合
滑动窗口聚合可以帮助处理呈现随时间累积的统计信息的特征值(例如总和、平均值等)。 SparkSQL Window 函数定义了数据中每一行的滑动窗口,在这些情况下非常有用。
对于每一行,对象 Window 都可以查看将来和过去。 在机器学习特征的上下文中,对每一行均应定义仅查找过去的 Window 对象。 有关更多详细信息,请访问“最佳做法”部分。
从此源数据开始:
user_id |
timestamp |
spend |
|---|---|---|
| 1 | 2022-12-10 06:00:00 | 12.00 |
| 2 | 2022-12-10 03:00:00 | 56.00 |
| 1 | 2022-12-11 01:00:00 | 10.00 |
| 2 | 2022-12-11 20:00:00 | 10.00 |
| 2 | 2022-12-12 02:00:00 | 100.00 |
定义名为 user_rolling_spend 的新特征集。 此特征集包括按用户滚动 1 天和 3 天的总支出:
from pyspark.sql import Dataframe
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml import Transformer
class UserRollingSpend(Transformer):
def _transform(df: Dataframe) -> Dataframe:
days = lambda i: i * 86400
w_1d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long'))\
.rangeBetween(-days(1), 0))
w_3d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long')).\
rangeBetween(-days(3), 0))
res = df.withColumn("spend_1d_sum", F.sum("spend").over(w_1d)) \
.withColumn("spend_3d_sum", F.sum("spend").over(w_3d)) \
.select("user_id", "timestamp", "spend_1d_sum", "spend_3d_sum")
return res
user_rolling_spend 特征集具有两个特征:
spend_1d_sum:双精度spend_3d_sum:双精度
这会显示其计算的特征值:
user_id |
timestamp |
spend_1d_sum |
spend_3d_sum |
|---|---|---|---|
| 1 | 2022-12-10 06:00:00 | 12.00 | 12.00 |
| 2 | 2022-12-10 03:00:00 | 56.00 | 56.00 |
| 1 | 2022-12-11 01:00:00 | 22.00 | 22.00 |
| 2 | 2022-12-11 20:00:00 | 10.00 | 66.00 |
| 2 | 2022-12-12 02:00:00 | 110.00 | 166.00 |
特征值计算使用当前行上的列,并结合范围中的前一行列。
翻转窗口聚合
翻转窗口可以聚合时序数据的数据。 将数据分组为固定大小、非重叠和连续时间窗口,然后将其聚合。 例如,用户可以根据每日或每小时聚合定义特征。 使用 pyspark.sql.functions.window 函数定义翻转窗口,以获取一致的结果。 输出特征 timestamp 应与每个翻转窗口的末尾保持一致。
从此源数据开始:
user_id |
timestamp |
spend |
|---|---|---|
| 1 | 2022-12-10 06:00:00 | 12.00 |
| 1 | 2022-12-10 16:00:00 | 10.00 |
| 2 | 2022-12-10 03:00:00 | 56.00 |
| 1 | 2022-12-11 01:00:00 | 10.00 |
| 2 | 2022-12-12 04:00:00 | 23.00 |
| 2 | 2022-12-12 12:00:00 | 10.00 |
定义名为 user_daily_spend 的新特征集:
from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame
class TransactionFeatureTransformer(Transformer):
def _transform(self, df: DataFrame) -> DataFrame:
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day"))\
.agg(F.sum("spend").alias("daily_spend"))
df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"daily_spend")
df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
.select("user_id", "timestamp","daily_spend")
return df3
user_daily_spend 特征集具有以下特征:
daily_spend:双精度
这会显示其计算的特征值:
user_id |
timestamp |
daily_spend |
|---|---|---|
| 1 | 2022-12-10 23:59:59 | 22.00 |
| 2 | 2022-12-10 23:59:59 | 56.00 |
| 1 | 2022-12-11 23:59:59 | 10.00 |
| 2 | 2022-12-12 23:59:59 | 33.00 |
交错窗口聚合
交错窗口聚合是翻转窗口聚合的次要变体。 交错窗口聚合将数据分组为固定大小的窗口。 但这些窗口可以相互重叠。 为此,请使用 pyspark.sql.functions.window,其中一个 slideDuration 小于 windowDuration。
从以下示例数据开始:
user_id |
timestamp |
spend |
|---|---|---|
| 1 | 2022-12-10 03:00:00 | 12.00 |
| 1 | 2022-12-10 09:00:00 | 10.00 |
| 1 | 2022-12-11 05:00:00 | 8.00 |
| 2 | 2022-12-12 14:00:00 | 56.00 |
定义名为 user_sliding_24hr_spend 的新特征集:
from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame
class TrsactionFeatureTransformer(Transformer):
def _transform(self, df: DataFrame) -> DataFrame:
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="6 hours"))\
.agg(F.sum("spend").alias("sliding_24hr_spend"))
df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"sliding_24hr_spend")
df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
.select("user_id", "timestamp","sliding_24hr_spend")
return df3
user_sliding_24hr_spend 特征集具有一项特征:
sliding_24hr_spend:双精度
这会显示其计算的特征值:
user_id |
timestamp |
sliding_24hr_spend |
|---|---|---|
| 1 | 2022-12-10 05:59:59 | 12.00 |
| 1 | 2022-12-10 11:59:59 | 22.00 |
| 1 | 2022-12-10 17:59:59 | 22.00 |
| 1 | 2022-12-10 23:59:59 | 22.00 |
| 1 | 2022-12-11 05:59:59 | 18.00 |
| 1 | 2022-12-11 11:59:59 | 8.00 |
| 1 | 2022-12-11 17:59:59 | 8.00 |
| 1 | 2022-12-11 23:59:59 | 8.00 |
| 1 | 2022-12-12 05:59:59 | 18.00 |
| 2 | 2022-12-12 17:59:59 | 56.00 |
| 2 | 2022-12-12 23:59:59 | 56.00 |
| 2 | 2022-12-13 05:59:59 | 56.00 |
| 2 | 2022-12-13 11:59:59 | 56.00 |
定义特征转换 - 最佳做法
防止特征转换中的数据泄露
如果每个计算特征值的时间戳值为 ts_0,则仅根据时间戳值为 ts_0 或之前的 source 数据计算特征值。 这可以避免基于特征事件时间之后的数据进行特征计算,否则称为数据泄露。
数据泄漏通常发生在滑动/翻转/交错窗口聚合中。 这些最佳做法有助于避免泄漏:
- 滑动窗口聚合:定义窗口,以从每一行仅向后追溯查看
- 翻转/交错窗口聚合:根据每个窗口的末尾定义特征时间戳
此数据示例显示了良好和错误的示例数据:
| 聚合 | 良好的示例 | 数据泄露的错误示例 |
|---|---|---|
| 滑动窗口 | Window.partitionBy("user_id") .orderBy(F.col("timestamp").cast('long')) . rangeBetween(-days(4), 0) |
Window.partitionBy("user_id") .orderBy(F.col("timestamp").cast('long')) . rangeBetween(-days(2), days(2)) |
| 翻转/交错窗口 | df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day")) .agg(F.sum("spend").alias("daily_spend")) df2 = df1.select("user_id", df1.window. end.cast("timestamp").alias("timestamp"),"daily_spend") |
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day")) .agg(F.sum("spend").alias("daily_spend")) df2 = df1.select("user_id", df1.window. start.cast("timestamp").alias("timestamp"),"daily_spend") |
特征转换定义中的数据泄漏可能会导致以下问题:
- 计算/具体化特征值中的错误
- 使用具体化特征值而不是即时计算的值时出现
get_offline_feature不一致
设置正确的 source_lookback
对于时序(滑动/翻转/交错窗口聚合)数据聚合,请正确设置 source_lookback 属性。 下图显示了特征(集)计算中源数据窗口与特征窗口之间的关系:
将 source_lookback 定义为时间增量值,它显示了给定时间戳的特征值所需的源数据范围。 此示例显示了常见转换类型的建议 source_lookback 值:
| 转换类型 | source_lookback |
|---|---|
| 行级转换 | 0(默认值) |
| 滑动窗口 | 转换器中最大窗口范围的大小。 例如, source_lookback = 3 天(如果特征集定义了 3 天滚动特征) source_lookback = 7 天(如果特征集同时定义了 3 天和 7 天滚动特征) |
| 翻转/交错窗口 | window 定义中 windowDuration 的值。 例如,使用 window("timestamp", windowDuration="1 day",slideDuration="6 hours) 时 source_lookback = 1 天 |
source_lookback 设置不正确可能会导致计算/具体化特征值错误。

