处理特征表
本页介绍了如何创建和使用 Unity Catalog 中的特征表。
此页面仅适用于为 Unity Catalog 启用的工作区。 如果没有为你的工作区启用 Unity Catalog,请参阅使用工作区特征存储中的特征表。
有关此页面上的示例中使用的命令和参数的详细信息,请参阅特征工程 Python API 参考。
要求
Unity Catalog 中的特征工程需要 Databricks Runtime 13.2 或更高版本。 此外,Unity Catalog 元存储必须具有特权模型版本 1.0。
在 Unity Catalog Python 客户端中安装特征工程
Unity Catalog 中的特征工程有一个 Python 客户端 FeatureEngineeringClient
。 该类在 PyPI 上的 databricks-feature-engineering
包中提供,并且预安装在 Databricks Runtime 13.3 LTS ML 及更高版本中。 如果使用非 ML Databricks Runtime,则必须手动安装客户端。 使用兼容性矩阵可找到适用于你的 Databricks Runtime 版本的正确版本。
%pip install databricks-feature-engineering
dbutils.library.restartPython()
创建 Unity Catalog 中特征表的目录和架构
必须新建目录或使用现有目录作为特征表。
要新建目录,必须具有对元存储的CREATE CATALOG
特权。
CREATE CATALOG IF NOT EXISTS <catalog-name>
要使用现有目录,必须具有对目录的USE CATALOG
特权。
USE CATALOG <catalog-name>
Unity Catalog 中的特征表必须存储在架构中。 要在目录中新建架构,必须具有对目录的CREATE SCHEMA
特权。
CREATE SCHEMA IF NOT EXISTS <schema-name>
创建 Unity Catalog 中的特征表
注意
可以使用 Unity Catalog 中的现有 Delta 表,该表包含主键约束作为特征表。 如果表未定义主键,则必须使用 ALTER TABLE
DDL 语句更新表以添加约束。 请参阅使用 Unity Catalog 中的现有 Delta 表作为特征表。
但要向通过 Delta Live Tables 管道发布到 Unity Catalog 的流式处理表或具体化视图添加主键,则需要修改流式处理表或具体化视图定义的架构,使其包含主键,然后刷新流式处理表或具体化视图。 请参阅使用由 Delta Live Tables 管道创建的流式处理表或具体化视图作为特征表。
Unity Catalog 中的特征表为 Delta 表。 特征表必须具有主键。 特征表(如 Unity Catalog 中的其他数据资产)可使用三级命名空间进行访问:<catalog-name>.<schema-name>.<table-name>
。
可以使用 Databricks SQL、Python FeatureEngineeringClient
或 Delta Live Tables 管道在 Unity Catalog 中创建特征表。
Databricks SQL
可以使用具有主键约束的 Delta 表作为特征表。 下面的代码演示如何使用主键创建表:
CREATE TABLE ml.recommender_system.customer_features (
customer_id int NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
);
若要创建时序特征表,请将时间列添加为主键列,并指定 TIMESERIES 关键字。 TIMESERIES 关键字需要 Databricks Runtime 13.3 LTS 或更高版本。
CREATE TABLE ml.recommender_system.customer_features (
customer_id int NOT NULL,
ts timestamp NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
);
创建表后,可以像写入其他 Delta 表一样将数据写入表中,并可将该表用作特征表。
Python
有关以下示例中使用的命令和参数的详细信息,请参阅特征工程 Python API 参考。
- 编写 Python 函数来计算特征。 每个函数的输出应是具有唯一主键的 Apache Spark 数据帧。 主键可由一个或多个列组成。
- 实例化
FeatureEngineeringClient
并使用create_table
以创建特征表。 - 使用
write_table
填充特征表。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
# Prepare feature DataFrame
def compute_customer_features(data):
''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
pass
customer_features_df = compute_customer_features(df)
# Create feature table with `customer_id` as the primary key.
# Take schema from DataFrame output by compute_customer_features
customer_feature_table = fe.create_table(
name='ml.recommender_system.customer_features',
primary_keys='customer_id',
schema=customer_features_df.schema,
description='Customer features'
)
# An alternative is to use `create_table` and specify the `df` argument.
# This code automatically saves the features to the underlying Delta table.
# customer_feature_table = fe.create_table(
# ...
# df=customer_features_df,
# ...
# )
# To use a composite primary key, pass all primary key columns in the create_table call
# customer_feature_table = fe.create_table(
# ...
# primary_keys=['customer_id', 'date'],
# ...
# )
# To create a time series table, set the timeseries_columns argument
# customer_feature_table = fe.create_table(
# ...
# primary_keys=['customer_id', 'date'],
# timeseries_columns='date',
# ...
# )
使用增量实时表管道在 Unity Catalog 中创建特征表
从 Delta Live Tables 管道发布的任何表(包括主键约束)都可以用作特征表。 若要在增量实时表管道中创建一个包含主键的表,可以使用 Databricks SQL 或增量实时表 Python 编程接口。
若要在增量实时表管道中创建一个包含主键的表,可以使用以下语法:
Databricks SQL
CREATE LIVE TABLE customer_features (
customer_id int NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
) AS SELECT * FROM ...;
Python
import dlt
@dlt.table(
schema="""
customer_id int NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
""")
def customer_features():
return ...
若要创建时序特征表,请将时间列添加为主键列,并指定 TIMESERIES 关键字。
Databricks SQL
CREATE LIVE TABLE customer_features (
customer_id int NOT NULL,
ts timestamp NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
) AS SELECT * FROM ...;
Python
import dlt
@dlt.table(
schema="""
customer_id int NOT NULL,
ts timestamp NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
""")
def customer_features():
return ...
创建表后,可以像写入其他 Delta Live Tables 数据集一样将数据写入表中,并可将该表用作特征表。
使用 Unity Catalog 中的现有 Delta 表作为特征表
Unity Catalog 中具有主键的任何 Delta 表都可以是 Unity Catalog 中的特征表,可以将特征 UI 和 API 与表配合使用。
注意
- 只有表所有者才能声明主键约束。 所有者的姓名显示在目录资源管理器的表详细信息页面。
- 验证 Unity 目录中的特征工程是否支持 Delta 表中的数据类型。 请参阅支持的数据类型。
- TIMESERIES 关键字需要 Databricks Runtime 13.3 LTS 或更高版本。
如果现有 Delta 表没有主键约束,则可以按如下所示创建一个:
将主键列设置为
NOT NULL
。 对于每个主键列,请运行:ALTER TABLE <full_table_name> ALTER COLUMN <pk_col_name> SET NOT NULL
更改表以添加主键约束:
ALTER TABLE <full_table_name> ADD CONSTRAINT <pk_name> PRIMARY KEY(pk_col1, pk_col2, ...)
pk_name
是主键约束的名称。 按照约定,可以使用带_pk
后缀的表名(无架构和目录)。 例如,名称为"ml.recommender_system.customer_features"
的表使用customer_features_pk
作为其主键约束的名称。若要使表成为时序特征表,请在一个主键列指定 TIMESERIES 关键字,如下所示:
ALTER TABLE <full_table_name> ADD CONSTRAINT <pk_name> PRIMARY KEY(pk_col1 TIMESERIES, pk_col2, ...)
在表上添加主键约束后,该表将显示在特征 UI 中,并且可以用作特征表。
使用由 Delta Live Tables 管道创建的流式处理表或具体化视图作为特征表
Unity Catalog 中具有主键的任何流式处理表或具体化视图都可以是 Unity Catalog 中的特征表,可以将特征 UI 和 API 与表配合使用。
注意
若要为现有流式处理表或具体化视图设置主键,请更新负责管理对象的笔记本中的流式处理表或具体化视图的架构。 然后,刷新表以更新 Unity Catalog 对象。
将主键添加到具体化视图的语法如下所示:
Databricks SQL
CREATE OR REFRESH MATERIALIZED VIEW existing_live_table(
id int NOT NULL PRIMARY KEY,
...
) AS SELECT ...
Python
import dlt
@dlt.table(
schema="""
id int NOT NULL PRIMARY KEY,
...
"""
)
def existing_live_table():
return ...
更新 Unity Catalog 中的特征表
可以添加新特征或基于主键修改特定的行,从而更新 Unity Catalog 中的特征表。
不应更新以下特征表元数据:
- 主密钥。
- 分区键。
- 现有特征的名称或数据类型。
更改它们将导致将特征用于训练和服务模型的下游管道中断。
将新特征添加到 Unity Catalog 中的现有特征表
可通过以下两种方式之一将新特征添加到现有的特征表:
- 更新现有的特征计算函数,并使用返回的数据帧作为参数来运行
write_table
。 这会更新特征表架构,并基于主键合并新的特征值。 - 创建新的特征计算函数来计算新的特征值。 此新计算函数返回的 DataFrame 必须包含特征表的主键和分区键(如果已定义)。 使用数据帧作为参数来运行
write_table
,以使用同一主键将新特征写入现有的特征表。
仅更新特征表中的特定行
在 write_table
中使用 mode = "merge"
。 write_table
调用中发送的数据帧内不存在其主键的行将保持不变。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.write_table(
name='ml.recommender_system.customer_features',
df = customer_features_df,
mode = 'merge'
)
计划一个作业来更新特征表
为确保特征表中的特征始终采用最新值,Databricks 建议创建一个作业,以便定期(例如每天)运行某个笔记本来更新特征表。 如果你已创建一个非计划作业,可将其转换为计划作业,以确保特征值始终是最新的。
用于更新特征表的代码使用 mode='merge'
,如以下示例所示。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
customer_features_df = compute_customer_features(data)
fe.write_table(
df=customer_features_df,
name='ml.recommender_system.customer_features',
mode='merge'
)
存储每日特征的既往值
定义一个带有复合主键的特征表。 在主键中包含日期。 例如,对于特征表 customer_features
,可以使用复合主键 (date
, customer_id
) 和分区键 date
实现高效的读取。
Databricks 建议在表上启用 Liquid 聚类分析,以实现高效的读取。 如果不使用 Liquid 聚类分析,请将日期列设置为分区键,以提高读取性能。
Databricks SQL
CREATE TABLE ml.recommender_system.customer_features (
customer_id int NOT NULL,
`date` date NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (`date`, customer_id)
)
-- If you are not using liquid clustering, uncomment the following line.
-- PARTITIONED BY (`date`)
COMMENT "Customer features";
Python
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.create_table(
name='ml.recommender_system.customer_features',
primary_keys=['date', 'customer_id'],
# If you are not using liquid clustering, uncomment the following line.
# partition_columns=['date'],
schema=customer_features_df.schema,
description='Customer features'
)
然后,可以创建代码以从特征表中读取数据并将 date
筛选到所需的时间段。
还可以创建一个时序特征表,在使用 create_training_set
或 score_batch
时启用时间点查找。 请参阅在 Unity Catalog 中创建特征表。
为使特征表保持最新,请设置按计划定期运行的作业以写入特征,或将新特征值流式传输到特征表。
创建流特征计算管道来更新特征
若要创建流式处理特征计算管道,请将流式处理 DataFrame
作为参数传递给 write_table
。 此方法将返回 StreamingQuery
对象。
def compute_additional_customer_features(data):
''' Returns Streaming DataFrame
'''
pass
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
customer_transactions = spark.readStream.load("dbfs:/events/customer_transactions")
stream_df = compute_additional_customer_features(customer_transactions)
fe.write_table(
df=stream_df,
name='ml.recommender_system.customer_features',
mode='merge'
)
从 Unity Catalog 中的特征表读取
使用 read_table
读取特征值。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
customer_features_df = fe.read_table(
name='ml.recommender_system.customer_features',
)
搜索和浏览 Unity Catalog 中的特征表
使用特征 UI 搜索或浏览 Unity Catalog 中的特征表。
点击侧边栏中的 “功能”以显示功能 UI。
使用目录选择器选择目录以查看该目录中的所有可用特征表。 在搜索框中,输入特征表、特征或注释的完整或部分名称。 你也可以输入标记的全部或部分键或值。 搜索文本不区分大小写。
获取 Unity Catalog 中特征表的元数据
使用get_table
获取特征表元数据。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
ft = fe.get_table(name="ml.recommender_system.user_feature_table")
print(ft.features)
在 Unity Catalog 中将标记与特征表和特征配合使用
可以使用标记(简单的键值对)对特征表和特征进行分类和管理。
对于要素表,可以使用目录资源管理器、笔记本或 SQL 查询编辑器中的 SQL 语句或特征工程 Python API 创建、编辑和删除标记。
对于功能,可以在笔记本或 SQL 查询编辑器中使用目录资源管理器或 SQL 语句创建、编辑和删除标记。
请参阅将标记应用于 Unity Catalog 安全对象和特征工程和工作区特征存储 Python API。
以下示例演示如何使用特征工程 Python API 创建、更新和删除特征表标记。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
# Create feature table with tags
customer_feature_table = fe.create_table(
# ...
tags={"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2", ...},
# ...
)
# Upsert a tag
fe.set_feature_table_tag(name="customer_feature_table", key="tag_key_1", value="new_key_value")
# Delete a tag
fe.delete_feature_table_tag(name="customer_feature_table", key="tag_key_2")
删除 Unity Catalog 中的特征表
可以使用目录资源管理器或使用特征工程存储 Python API 直接删除 Unity Catalog 中的 Delta 表,从而删除 Unity Catalog 中的特征表。
注意
- 删除特征表可能会导致上游创建者和下游使用者(模型、终结点和计划作业)出现意外故障。 必须在云提供商的配合下删除发布的联机存储。
- 删除 Unity Catalog 中的特征表时,也会删除基础增量表。
- Databricks Runtime 13.1 ML 或更低版本不支持
drop_table
。 使用 SQL 命令删除表。
可以使用 Databricks SQL 或 FeatureEngineeringClient.drop_table
删除 Unity Catalog 中的特征表:
Databricks SQL
DROP TABLE ml.recommender_system.customer_features;
Python
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.drop_table(
name='ml.recommender_system.customer_features'
)
在 Unity Catalog 中跨工作区或帐户共享功能表
Unity Catalog 中的功能表可供分配给表的 Unity Catalog 元存储的所有工作区访问。
若要与未分配给同一 Unity Catalog 元存储的工作区共享功能表,请使用增量共享。