重要
此功能在 Beta 版中。 工作区管理员可以从 预览 页控制对此功能的访问。 请参阅 Manage Azure Databricks 预览版。
创建声明性功能定义(存储在 Unity 目录中)后,可以使用功能定义从源表生成特征数据。 此过程称为特征具体化。 Azure Databricks 创建和管理 Lakeflow Spark 声明性管道,以填充 Unity 目录中的表,以便进行模型训练和批量评分或联机服务。
有关提供声明性功能的信息,请参阅 Serve 声明性功能。
要求
- 必须使用声明性功能 API 创建功能,并将其存储在 Unity 目录中。
- 有关版本要求,请参阅 要求。
-
ColumnSelection功能可以具体化到在线商店。 请参阅 列选择物化。 -
RequestSource特征无法具体化,因为它们表示推理时提供的数据。
API 数据结构
OfflineStoreConfig
用于存储物化特征的离线存储配置。 当调用 materialize_features 时,特征存储后端使用此前缀创建表。 每次管道运行根据物化调度将最新特征值物化到表中。
OfflineStoreConfig(
catalog_name: str, # Catalog name for the offline table where materialized features will be stored
schema_name: str, # Schema name for the offline table
table_name_prefix: str # Table name prefix for the offline table. The pipeline may create multiple tables with this prefix, each updated at different cadences
)
from databricks.feature_engineering.entities import OfflineStoreConfig
offline_store = OfflineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features"
)
OnlineStoreConfig
在线商店的配置,用于存储模型服务使用的功能。 物化生成catalog.schema.table_name_prefixDelta表,并将这些表流式传输到具有相同名称的在线特征库。
from databricks.feature_engineering.entities import OnlineStoreConfig
online_store = OnlineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store"
)
MaterializedFeature
表示已具体化的声明式特性,即在 Unity Catalog 中提供的预计算表示形式。 脱机表和联机表具有特定的具体实现特性。 通常,用户不会直接实例化 MaterializedFeature 。
API 函数调用
materialize_features()
将声明性特征列表具体化为离线 Delta 表或在线特征存储。 在调用此函数之前,必须在 Unity 目录中注册功能(例如,使用 create_feature 或 register_feature)。 未注册的本地构造功能将不起作用。
FeatureEngineeringClient.materialize_features(
features: List[Feature], # List of declarative features to materialize
offline_config: Optional[OfflineStoreConfig] = None, # Offline store config (aggregation features only)
online_config: Optional[OnlineStoreConfig] = None, # Online store config
trigger: Union[CronSchedule, TableTrigger], # Materialization trigger
) -> List[MaterializedFeature]:
该方法返回具体化特征的列表,其中包含有关何时更新特征值的元数据,以及具体化功能的 Unity 目录表。
如果同时提供一个 OnlineStoreConfig 和一个 OfflineStoreConfig ,则为每个提供的功能返回两个具体化特征,每个类型的存储都有一个。
参数 trigger 控制材质化管道的运行时间:
-
CronSchedule:按固定计划运行。 聚合特征(AggregationFunction) 是必需的。 -
TableTrigger:当上游 Delta 表收到提交时运行。ColumnSelection支持的功能DeltaTableSource是必要的。
不能在单个ColumnSelection调用中混合materialize_features和聚合功能,因为它们需要不同的触发器类型。 改为发出单独的调用,而不是合并调用。
具体化为脱机存储
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, MaterializedFeaturePipelineScheduleState, OfflineStoreConfig,
)
fe = FeatureEngineeringClient()
materialized = fe.materialize_features(
features=features,
offline_config=OfflineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features"
),
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
),
)
具体化到在线商店
注释
若要将聚合功能应用于在线商店,还必须应用于线下商店。
offline_config 和 online_config 均为必需。
online_store_name必须引用现有的在线特征库。 有关创建一个的说明,请参阅 Databricks Online 特征库。
ColumnSelection 功能不需要 OfflineStoreConfig. 请参阅 列选择物化。
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, MaterializedFeaturePipelineScheduleState,
OfflineStoreConfig, OnlineStoreConfig,
)
fe = FeatureEngineeringClient()
materialized = fe.materialize_features(
features=features,
offline_config=OfflineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features"
),
online_config=OnlineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store"
),
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
),
)
list_materialized_features()
返回用户 Unity Catalog 元数据存储中所有已具体化特性的列表。
默认情况下,最多返回 100 个功能。 可以使用参数更改此限制 max_results 。
若要按特征名称筛选返回的具体化特征,请使用可选 feature_name 参数。
FeatureEngineeringClient.list_materialized_features(
feature_name: Optional[str] = None, # Optional feature name to filter by
max_results: int = 100, # Maximum number of features to be returned
) -> List[MaterializedFeature]:
delete_materialized_feature()
在删除具体化功能之前,请删除或更新引用该功能的任何模型或功能规格。
删除已具体化的特性。 要传递的功能取决于功能类型:
- 聚合特征:传递脱机物化特征。 如果同一功能存在联机具体化功能,则会删除两者。
-
ColumnSelection特征:传递联机已生成的特征。ColumnSelection功能仅具体化到在线商店(请参阅 ColumnSelection 具体化),因此没有配对的脱机功能。
具体化过程中,功能按数据源和汇总时段进行组合,以提高效率。
ColumnSelection 功能没有聚合窗口,因此它们仅按数据源分组。 在删除所有分组特征之前,实体化管道、离线表和在线表都不会被删除。 当删除某个组中的最后一个实体化特征时,特征存储会将相关资源安排为由后台进程自动清理。 请参阅 后台资源清理。
若要清理具体化特征,请查看与具体化特征关联的表。 在清理计算和 Delta 表资源之前,必须删除表中的每个功能(每列一个)。
使用 list_materialized_features() 获取 materialized_feature 参数。
FeatureEngineeringClient.delete_materialized_feature(
materialized_feature: MaterializedFeature, # Required: The materialized feature to delete
) -> None
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import ColumnSelection
fe = FeatureEngineeringClient()
feature_names = [
"main.feature_store.amount_sum_sliding_7d_1d",
"main.feature_store.amount_sum_sliding_30d_1d",
"main.feature_store.transaction_count_sliding_7d_1d",
"main.feature_store.latest_transaction_amount",
"main.feature_store.latest_user_tier",
]
for name in feature_names:
feature = fe.get_feature(full_name=name)
for mf in fe.list_materialized_features(feature_name=name):
if isinstance(feature.function, ColumnSelection):
# ColumnSelection features only have online materializations. Delete the online materialized feature directly.
fe.delete_materialized_feature(materialized_feature=mf)
elif not mf.is_online:
# Aggregation features have both offline and online materializations. Delete the offline materialized feature to delete both.
fe.delete_materialized_feature(materialized_feature=mf)
# Online materialized aggregation features cannot be deleted directly. They are deleted via their paired offline materialized features.
ColumnSelection 具体化
ColumnSelection 功能为每个实体键选择单个列的最新值,而无需聚合。 它们只能具体化到在线商店。 对于脱机用例(训练和批处理推理), ColumnSelection 功能在查询时直接从源数据中提取,因此不需要脱机具体化。
具体化行为
- 管道将每个实体键值的最新行写入在线表,没有聚合窗口。
- 在线实例化将当前每个实体键的最新值填充到联机表中。
示例
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
DeltaTableSource, Feature, ColumnSelection, TableTrigger, OnlineStoreConfig,
)
fe = FeatureEngineeringClient()
delta_source = DeltaTableSource(
catalog_name="catalog",
schema_name="schema",
table_name="transactions",
)
amount_feature = Feature(
source=delta_source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
name="latest_transaction_amount",
)
# Register before materializing
amount_feature = fe.register_feature(
feature=amount_feature,
catalog_name="catalog",
schema_name="schema",
)
mfs = fe.materialize_features(
features=[amount_feature],
online_config=OnlineStoreConfig(
catalog_name="catalog",
schema_name="feats_online",
table_name_prefix="txn_",
online_store_name="lb_usw2"
),
trigger=TableTrigger(),
)
每当源 Delta 表收到新提交时,ColumnSelection 功能使用 TableTrigger 来运行该管道。 不需要 offline_config ,因为 ColumnSelection 功能直接从源读取脱机用例(训练和批处理推理)。
注释
RequestSource 特征无法具体化,因为它们表示调用方在推理时提供的数据(或在训练时从标记的数据帧中提取)。 没有要从中读取的源表 — 这些值仅存在于请求有效负载或训练数据帧中。
后台资源清理
删除具体化功能时,Databricks 会立即删除功能元数据。 关联的基础结构(表、管道和作业)由后台进程异步清理。
由于多个具体化特征可以共享相同的表和管道,因此,只有在引用这些资源的所有具体化特征都已被删除后,这些共享资源才会被删除。 当共享一组表的最后一个物化特征被删除时,后台进程会自动删除以下资源:
- 包含物化特征数据的离线 Delta 表
- 联机表(如果功能已具体化到在线商店)
- 物化管道
- 编排作业
此后台进程使用由 Databricks 托管的系统服务主体,为你执行这些清理操作,包括删除工作区中的表、管道和作业。 无需你执行任何操作 — 清理完全由功能存储区管理。
注释
删除组中最后具体化功能以及删除关联的表和其他资源之间可能存在短暂的延迟。
局限性
- 批处理滚动窗口特性无法实现。 由于时间正确性高保真度,因此会为每个数据点动态生成用于脱机训练或批量推理的滚动窗口功能。
-
ColumnSelection功能只能具体化到在线商店。 -
RequestSource功能无法实现。 - 具体化特征只能在生成它们的工作区中删除。
- 对于具体化聚合特征,无法直接删除联机具体化特征。 删除配对的脱机物化特征,更改将同步传播至两者。
- 对于在 2026 年 4 月 20 日之前创建的具体化聚合特征,具体化管道将继续生成新的特征值,直到删除管道中的所有具体化特征,这会触发资源清理。 若要创建支持按功能删除的更新管道,请删除并重新具体化该功能。
- 对于具体化特征的处理,具体化
ColumnSelection管道会继续生成新的特征值,直到管道中的所有具体化特征被删除,这将触发资源清理。