重要
此功能在 Beta 版中。 工作区管理员可以从 预览 页控制对此功能的访问。 请参阅 Manage Azure Databricks 预览版。
使用声明性特征工程 API,可以从数据源定义和计算特征。 可以使用多种来源(Delta 表和请求时数据)以及多种计算方法(时间窗口聚合、简单列选择等)来定义特征。 本指南介绍以下工作流:
-
功能开发 工作流
- 用于
create_feature定义可在模型训练和服务工作流中使用的 Unity Catalog 功能对象。 - 或者在本地创建
Feature对象,并稍后使用register_feature将它们持久化到 Unity Catalog。 在本地构建的特性可以在注册之前与它一起使用create_training_set。
- 用于
-
模型训练 工作流
- 使用
create_training_set来计算机器学习中的时间点聚合特征。 有关使用声明性功能训练的详细文档,请参阅 使用声明性特征训练模型。
- 使用
-
特征具体化和服务 工作流
- 在定义了
create_feature功能或使用get_feature检索到该功能之后,可以使用materialize_features将该功能或功能集物化到离线存储,以便高效重复使用,或者将功能物化到在线商店以进行在线服务。 - 使用
create_training_set和物化视图准备一个脱机批量训练数据集。
- 在定义了
有关 API 详细信息,请参阅 声明性功能 API 参考。
要求
运行 Databricks Runtime 17.0 ML 或更高版本 的经典计算 群集。
必须安装自定义 Python 包。 每次运行笔记本时,运行以下代码行:
%pip install databricks-feature-engineering>=0.15.0 dbutils.library.restartPython()
快速入门示例
有关可运行的快速入门笔记本,请参阅 示例笔记本。
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, DeltaTableSource, Feature, AggregationFunction,
MaterializedFeaturePipelineScheduleState,
Sum, Avg, ColumnSelection, TableTrigger,
TumblingWindow, SlidingWindow,
OfflineStoreConfig, OnlineStoreConfig,
)
from datetime import timedelta
CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"
# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
)
# 2. Define features locally (no catalog/schema needed yet)
avg_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), TumblingWindow(window_duration=timedelta(days=30))),
name="avg_transaction_30d",
)
sum_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))),
# name auto-generated: "amount_sum_sliding_7d_1d"
)
fe = FeatureEngineeringClient()
# 3. Explore features with compute_features
feature_df = fe.compute_features(features=[avg_feature, sum_feature])
feature_df.display()
# 4. Create training set using local features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
training_set = fe.create_training_set(
df=labeled_df,
features=[avg_feature, sum_feature],
label="target",
)
training_set.load_df().display()
# 5. Register features in Unity Catalog
avg_feature = fe.register_feature(
feature=avg_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
sum_feature = fe.register_feature(
feature=sum_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
# 6. Or use create_feature for a one-step define-and-register workflow
latest_amount = fe.create_feature(
source=source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="latest_amount",
)
# 7. Train model
with mlflow.start_run():
training_df = training_set.load_df()
# training code
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)
# 8. (Optional) Materialize features for serving
# Features must be registered in UC before calling materialize_features
online_config = OnlineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store",
)
# Aggregation features use CronSchedule and support both offline and online configs
fe.materialize_features(
features=[avg_feature, sum_feature],
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features",
),
online_config=online_config,
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
),
)
# ColumnSelection features use TableTrigger and only support online config
fe.materialize_features(
features=[latest_amount],
online_config=online_config,
trigger=TableTrigger(),
)
示例笔记本
声明性功能快速入门笔记本
模型训练和推理
若要使用声明性特征log_model()、score_batch()和create_training_set()训练模型并运行批处理推理,请参阅使用声明性特征训练模型。
特征具体化
定义功能后,可以将这些功能具体化为脱机或在线商店,以便在培训和服务工作流中高效重复使用。 具体化功能后,可以使用 CPU 模型服务为模型提供服务。 有关详细信息,请参阅 具体化声明性功能。
最佳做法
功能命名
- 对业务关键功能使用描述性名称。
- 跨团队遵循一致的命名约定。
- 开始开发功能时,请使用 自动生成的名称 。
时间范围
- 将窗口边界与业务周期(每日、每周)对齐。
- 较短的窗口可以捕捉到最近的趋势,但可能会有较多噪声。 较长的窗口会产生更稳定的功能分布,但可能会错过最近的行为转变。 根据用例中的信号变化速度来选择。 例如,7 天窗口可平滑每日波动并生成一致的模型输入,而 1 小时窗口对行为更改做出快速反应,但可能会引入降低模型性能的方差。 如果模型的准确性在分布移动时下降,请使用更长的窗口来稳定输入。
- 翻转和滑动窗口比滚动(连续)窗口更具可伸缩性。 从滑动窗口开始,适用于大多数用例。
Performance
- 在单个
materialize_features调用中具体化来自同一数据源的功能,以最大程度地减少数据扫描。 - 对同一数据源上的功能使用相同的粒度(例如,所有 1 小时或全部 1 天幻灯片持续时间),以便在具体化期间更好地分组。
实体列与筛选条件
使用同一源表中的功能时,请使用此决策指南:
在需要不同聚合级别时,请使用 entity (在 create_feature):
-
客户级功能 (每个客户一行):
entity=["customer_id"] -
客户-商家功能 (每个客户多行):
entity=["customer_id", "merchant_id"] -
不同的聚合级别可以共享相同的
DeltaTableSource— 在每个功能定义上指定不同的entity值
需要在同一聚合级别筛选行时,请使用 filter_condition (on DeltaTableSource):
-
仅高价值交易:
filter_condition="amount > 100"(仍按客户聚合) -
仅包含已完成订单:
filter_condition="status = 'completed'"(仍按每位客户进行聚合)
经验法则: 如果您的更改会导致每个实体值生成不同数量的行,请在功能定义中使用不同的entity值。 如果只是筛选哪些行促成了相同的聚合,请对源使用 filter_condition 。
常见模式
客户分析
from databricks.feature_engineering.entities import AggregationFunction, Sum, Count, RollingWindow
fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=1)))),
# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=90)))),
# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=30)))),
]
走向分析
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)
historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7), delay=timedelta(days=7))),
)
季节性模式
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=1), delay=timedelta(weeks=4))),
)
局限性
- 实体和时间序列列的名称必须与在 API 中使用的
create_training_set训练(已标记)数据集和特征定义之间匹配。 - 训练数据集中用作
label列的列名不应存在于用于定义Features 的源表中。 - API 支持的函数列表(UDAF)是有限的。 请参阅 支持的函数。
- 实体列不能为类型
DATE或类型TIMESTAMP。 -
RequestSource仅支持在ScalarDataType(INTEGER、FLOAT、BOOLEAN、STRING、DOUBLE、LONG、TIMESTAMP、DATE、SHORT)中定义的标量数据类型。 不支持复杂类型,如数组、映射和结构。 -
RequestSource不支持聚合函数或时间窗口。 只能使用ColumnSelection函数。 - 实体列名称集、时间序列列名称集和请求特征列名称必须在训练集或服务终端中的所有来源中具有全局唯一性。
有关具体化限制的详细信息,请参阅 限制。