特征转换和最佳做法

本文介绍特征集规范、可用于该特征集的不同类型的转换以及相关的最佳做法。

特征集是由源数据转换生成的特征的集合。 特征集规范是特征集开发和本地测试的自包含定义。 在进行开发和本地测试后,可以在特征存储中将该特征集注册为特征集资产。 然后,你就拥有了可用作托管功能的版本控制和具体化。

定义特征集

FeatureSetSpec 定义了特征集。 此示例显示了特征集规范文件:

$schema: http://azureml/sdk-2-0/FeatureSetSpec.json

source:
  type: parquet
  path: abfs://file_system@account_name.dfs.core.chinacloudapi.cn/datasources/transactions-source/*.parquet
  timestamp_column: # name of the column representing the timestamp.
    name: timestamp
  source_delay:
    days: 0
    hours: 3
    minutes: 0
feature_transformation:
  transformation_code:
    path: ./transformation_code
    transformer_class: transaction_transform.TransactionFeatureTransformer
features:
  - name: transaction_7d_count
    type: long
  - name: transaction_amount_7d_sum
    type: double
  - name: transaction_amount_7d_avg
    type: double
  - name: transaction_3d_count
    type: long
  - name: transaction_amount_3d_sum
    type: double
  - name: transaction_amount_3d_avg
    type: double
index_columns:
  - name: accountID
    type: string
source_lookback:
  days: 7
  hours: 0
  minutes: 0
temporal_join_lookback:
  days: 1
  hours: 0
  minutes: 0

注意

featurestore 核心 SDK 可自动生成特征集规范 YAML本教程 提供了一个示例。

FeatureSetSpec 定义中,以下属性与特征转换相关:

  • source:定义了源数据和相关元数据,例如数据中的时间戳列。 目前仅支持时序源数据和特征。 source.timestamp_column 属性是必需的
  • feature_transformation.transformation_code:定义了特征转换器的代码文件夹位置
  • features:定义了特征转换器生成的特征架构
  • index_columns:定义了特征转换器生成的索引列架构
  • source_lookback:当特征处理时序(例如窗口聚合)数据的聚合时,将使用此属性。 此属性的值指示过去源数据所需的时间范围(适用于时间 T 时的特征值)。有关详细信息,请参阅“最佳做法”部分

如何计算特征?

定义 FeatureSetSpec 后,请调用 featureSetSpec.to_spark_dataframe(feature_window_start_ts, feature_window_end_ts) 来计算给定特征窗口中的特征。

计算在以下步骤中发生:

  • 从源数据读取数据。 source 定义了源数据。 按时间范围 [feature_window_start_ts - source_lookback, feature_window_end_ts) 筛选数据。 时间范围包括窗口的开始时间,但不包括窗口的结束时间
  • 对数据应用特征转换器(由 feature_transformation.transformation_code 定义)并获取计算特征
  • 筛选特征值以仅返回特征窗口 [feature_window_start_ts, feature_window_end_ts) 中的特征记录

在此代码示例中,特征存储 API 会计算特征:

# define the source data time window according to feature window
source_window_start_ts = feature_window_start_ts - source_lookback
source_window_end_ts = feature_window_end_ts

# read source table into a dataframe
df1 = spark.read.parquet(source.path).filter(df1["timestamp"] >= source_window_start_ts && df1["timestamp"] < source_window_end_ts)

# apply the feature transformer
df2 = FeatureTransformer._transform(df1)

## filter the feature(set) to include only feature records within the feature window
feature_set_df = df2.filter(df2["timestamp"] >= feature_window_start_ts && df2["timestamp"] < feature_window_end_ts)

Illustration showing feature set specification and corresponding transformations applied on source data to produce feature dataframe.

特征转换器函数的输出架构

转换函数会输出一个数据帧,它在其架构中包含以下值:

  • FeatureSetSpec 定义在名称和类型方面均匹配的索引列
  • source 中时间戳定义匹配的时间戳列(名称)。 source 位于 FeatureSetSpec
  • 将所有其他列名/类型值定义为 FeatureSetSpec 中的 features

实现适用于常见转换类型的特征转换器

行级转换

在行级别转换中,特定行上的特征值计算仅使用该行的列值。 从此源数据开始:

user_id timestamp total_spend
1 2022-12-19 06:00:00 12.00
2 2022-12-10 03:00:00 56.00
1 2022-12-25 13:00:00 112.00

定义名为 user_total_spend_profile 的新特征集:

from pyspark.sql import Dataframe
from pyspark.ml import Transformer

class UserTotalSpendProfileTransformer(Transformer):

    def _transform(df: Dataframe) -> Dataframe:
        df.withColumn("is_high_spend_user", col("total_spend") > 100.0) \
           .withColumn("is_low_spend_user", col("total_spend") < 20.0)

此特征集具有三个特征,其数据类型如下所示:

  • total_spend:双精度
  • is_high_spend_user:布尔
  • is_low_spend_user:布尔

这显示了计算的特征值:

user_id timestamp total_spend is_high_spend_user is_low_spend_user
1 2022-12-19 06:00:00 12.00 false true
2 2022-12-10 03:00:00 56.00 false false
1 2022-12-25 13:00:00 112.00 true false

滑动窗口聚合

滑动窗口聚合可以帮助处理呈现随时间累积的统计信息的特征值(例如总和、平均值等)。 SparkSQL Window 函数定义了数据中每一行的滑动窗口,在这些情况下非常有用。

对于每一行,对象 Window 都可以查看将来和过去。 在机器学习特征的上下文中,对每一行均应定义仅查找过去的 Window 对象。 有关更多详细信息,请访问“最佳做法”部分

从此源数据开始:

user_id timestamp spend
1 2022-12-10 06:00:00 12.00
2 2022-12-10 03:00:00 56.00
1 2022-12-11 01:00:00 10.00
2 2022-12-11 20:00:00 10.00
2 2022-12-12 02:00:00 100.00

定义名为 user_rolling_spend 的新特征集。 此特征集包括按用户滚动 1 天和 3 天的总支出:

from pyspark.sql import Dataframe
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml import Transformer

class UserRollingSpend(Transformer):

    def _transform(df: Dataframe) -> Dataframe:
        days = lambda i: i * 86400
        w_1d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long'))\
                .rangeBetween(-days(1), 0))
        w_3d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long')).\
                rangeBetween(-days(3), 0))
        res = df.withColumn("spend_1d_sum", F.sum("spend").over(w_1d)) \
            .withColumn("spend_3d_sum", F.sum("spend").over(w_3d)) \
            .select("user_id", "timestamp", "spend_1d_sum", "spend_3d_sum")
        return res
    

user_rolling_spend 特征集具有两个特征:

  • spend_1d_sum:双精度
  • spend_3d_sum:双精度

这会显示其计算的特征值:

user_id timestamp spend_1d_sum spend_3d_sum
1 2022-12-10 06:00:00 12.00 12.00
2 2022-12-10 03:00:00 56.00 56.00
1 2022-12-11 01:00:00 22.00 22.00
2 2022-12-11 20:00:00 10.00 66.00
2 2022-12-12 02:00:00 110.00 166.00

特征值计算使用当前行上的列,并结合范围中的前一行列。

翻转窗口聚合

翻转窗口可以聚合时序数据的数据。 将数据分组为固定大小、非重叠和连续时间窗口,然后将其聚合。 例如,用户可以根据每日或每小时聚合定义特征。 使用 pyspark.sql.functions.window 函数定义翻转窗口,以获取一致的结果。 输出特征 timestamp 应与每个翻转窗口的末尾保持一致。

从此源数据开始:

user_id timestamp spend
1 2022-12-10 06:00:00 12.00
1 2022-12-10 16:00:00 10.00
2 2022-12-10 03:00:00 56.00
1 2022-12-11 01:00:00 10.00
2 2022-12-12 04:00:00 23.00
2 2022-12-12 12:00:00 10.00

定义名为 user_daily_spend 的新特征集:

from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame

class TransactionFeatureTransformer(Transformer):
    def _transform(self, df: DataFrame) -> DataFrame:
        df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day"))\
                .agg(F.sum("spend").alias("daily_spend"))
        df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"daily_spend")
        df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
              .select("user_id", "timestamp","daily_spend")
        return df3

user_daily_spend 特征集具有以下特征:

  • daily_spend:双精度

这会显示其计算的特征值:

user_id timestamp daily_spend
1 2022-12-10 23:59:59 22.00
2 2022-12-10 23:59:59 56.00
1 2022-12-11 23:59:59 10.00
2 2022-12-12 23:59:59 33.00

交错窗口聚合

交错窗口聚合是翻转窗口聚合的次要变体。 交错窗口聚合将数据分组为固定大小的窗口。 但这些窗口可以相互重叠。 为此,请使用 pyspark.sql.functions.window,其中一个 slideDuration 小于 windowDuration

从以下示例数据开始:

user_id timestamp spend
1 2022-12-10 03:00:00 12.00
1 2022-12-10 09:00:00 10.00
1 2022-12-11 05:00:00 8.00
2 2022-12-12 14:00:00 56.00

定义名为 user_sliding_24hr_spend 的新特征集:

from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame

class TrsactionFeatureTransformer(Transformer):
    def _transform(self, df: DataFrame) -> DataFrame:
        df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="6 hours"))\
                .agg(F.sum("spend").alias("sliding_24hr_spend"))
        df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"sliding_24hr_spend")
        df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
              .select("user_id", "timestamp","sliding_24hr_spend")
        return df3

user_sliding_24hr_spend 特征集具有一项特征:

  • sliding_24hr_spend:双精度

这会显示其计算的特征值:

user_id timestamp sliding_24hr_spend
1 2022-12-10 05:59:59 12.00
1 2022-12-10 11:59:59 22.00
1 2022-12-10 17:59:59 22.00
1 2022-12-10 23:59:59 22.00
1 2022-12-11 05:59:59 18.00
1 2022-12-11 11:59:59 8.00
1 2022-12-11 17:59:59 8.00
1 2022-12-11 23:59:59 8.00
1 2022-12-12 05:59:59 18.00
2 2022-12-12 17:59:59 56.00
2 2022-12-12 23:59:59 56.00
2 2022-12-13 05:59:59 56.00
2 2022-12-13 11:59:59 56.00

定义特征转换 - 最佳做法

防止特征转换中的数据泄露

如果每个计算特征值的时间戳值为 ts_0,则仅根据时间戳值为 ts_0 或之前的 source 数据计算特征值。 这可以避免基于特征事件时间之后的数据进行特征计算,否则称为数据泄露

数据泄漏通常发生在滑动/翻转/交错窗口聚合中。 这些最佳做法有助于避免泄漏:

  • 滑动窗口聚合:定义窗口,以从每一行仅向后追溯查看
  • 翻转/交错窗口聚合:根据每个窗口的末尾定义特征时间戳

此数据示例显示了良好和错误的示例数据:

聚合 良好的示例 数据泄露的错误示例
滑动窗口 Window.partitionBy("user_id")
.orderBy(F.col("timestamp").cast('long'))
.rangeBetween(-days(4), 0)
Window.partitionBy("user_id")
.orderBy(F.col("timestamp").cast('long'))
.rangeBetween(-days(2), days(2))
翻转/交错窗口 df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day"))
.agg(F.sum("spend").alias("daily_spend"))

df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("timestamp"),"daily_spend")
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day"))
.agg(F.sum("spend").alias("daily_spend"))

df2 = df1.select("user_id", df1.window.start.cast("timestamp").alias("timestamp"),"daily_spend")

特征转换定义中的数据泄漏可能会导致以下问题:

  • 计算/具体化特征值中的错误
  • 使用具体化特征值而不是即时计算的值时出现 get_offline_feature 不一致

设置正确的 source_lookback

对于时序(滑动/翻转/交错窗口聚合)数据聚合,请正确设置 source_lookback 属性。 下图显示了特征(集)计算中源数据窗口与特征窗口之间的关系:

Illustration showing the concept of source_lookback.

source_lookback 定义为时间增量值,它显示了给定时间戳的特征值所需的源数据范围。 此示例显示了常见转换类型的建议 source_lookback 值:

转换类型 source_lookback
行级转换 0(默认值)
滑动窗口 转换器中最大窗口范围的大小。
例如,
source_lookback = 3 天(如果特征集定义了 3 天滚动特征)
source_lookback = 7 天(如果特征集同时定义了 3 天和 7 天滚动特征)
翻转/交错窗口 window 定义中 windowDuration 的值。 例如,使用 window("timestamp", windowDuration="1 day",slideDuration="6 hours) 时 source_lookback = 1 天

source_lookback 设置不正确可能会导致计算/具体化特征值错误。

后续步骤