Unity Catalog 中的特征工程

本页介绍了如何创建和使用 Unity Catalog 中的特征表。

此页面仅适用于为 Unity Catalog 启用的工作区。 如果你的工作区未启用 Unity Catalog,请参阅使用工作区特征存储中的特征

要求

Unity Catalog 中的特征工程需要 Databricks Runtime 13.2 ML 或更高版本。 此外,Unity Catalog 元存储必须具有特权模型版本 1.0

在 Unity Catalog Python 客户端中安装特征工程

Unity Catalog 中的特征工程提供了 Python 客户端 FeatureEngineeringClient。 该类在 PyPI 上的 databricks-feature-engineering 包中提供,并且预安装在 Databricks Runtime 13.2 ML 及更高版本中。 如果使用的是一个非 ML Databricks Runtime,则需要手动安装客户端。 使用兼容性矩阵可找到适用于你的 Databricks Runtime 版本的正确版本。

%pip install databricks-feature-engineering

dbutils.library.restartPython()

创建 Unity Catalog 中特征表的目录和架构

必须新建目录或使用现有目录作为特征表。

要新建目录,必须具有对元存储CREATE CATALOG特权。

CREATE CATALOG IF NOT EXISTS <catalog-name>

要使用现有目录,必须具有对目录的USE CATALOG特权。

USE CATALOG <catalog-name>

Unity Catalog 中的特征表必须存储在架构中。 要在目录中新建架构,必须具有对目录的CREATE SCHEMA特权。

CREATE SCHEMA IF NOT EXISTS <schema-name>

创建 Unity Catalog 中的特征表

注意

可以使用 Unity Catalog 中的现有 Delta 表作为特征表。 请参阅使用 Unity Catalog 中的现有 Delta 表作为特征表

还可以使用 Unity Catalog 中的现有增量实时表作为特征表。 请参阅使用由 Delta Live Tables 管道创建的现有流式处理表或具体化视图作为特征表

Unity Catalog 中的特征表是由 Unity Catalog 管理的 Delta 表增量实时表。 特征表必须具有主键。 Unity Catalog 中特征表的名称具有三级结构<catalog-name>.<schema-name>.<table-name>

可以使用 Databricks SQL 或 Python FeatureEngineeringClient 在 Unity Catalog 中创建特征 Delta 表。 可以使用增量实时表管道在 Unity Catalog 中创建功能增量实时表。

Databricks sql

可以使用具有主键约束的 Delta 表作为特征表。 下面的代码演示如何使用主键创建表:

CREATE TABLE ml.recommender_system.customer_features (
  customer_id int NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
);

若要创建时序特征表,请将时间列添加为主键列,并指定 TIMESERIES 关键字。 TIMESERIES 关键字需要 Databricks Runtime 13.3 LTS 或更高版本。

CREATE TABLE ml.recommender_system.customer_features (
  customer_id int NOT NULL,
  ts timestamp NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
);

创建表后,可以像写入其他 Delta 表一样将数据写入其中,可以将其用作特征表。

Python

有关以下示例中使用的命令和参数的详细信息,请参阅特征工程 Python API 参考

  1. 编写 Python 函数来计算特征。 每个函数的输出应是具有唯一主键的 Apache Spark 数据帧。 主键可由一个或多个列组成。
  2. 实例化FeatureEngineeringClient并使用create_table以创建特征表。
  3. 使用 write_table 填充特征表。
from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

# Prepare feature DataFrame
def compute_customer_features(data):
  ''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
  pass

customer_features_df = compute_customer_features(df)

# Create feature table with `customer_id` as the primary key.
# Take schema from DataFrame output by compute_customer_features
customer_feature_table = fe.create_table(
  name='ml.recommender_system.customer_features',
  primary_keys='customer_id',
  schema=customer_features_df.schema,
  description='Customer features'
)

# An alternative is to use `create_table` and specify the `df` argument.
# This code automatically saves the features to the underlying Delta table.

# customer_feature_table = fe.create_table(
#  ...
#  df=customer_features_df,
#  ...
# )

# To use a composite primary key, pass all primary key columns in the create_table call

# customer_feature_table = fe.create_table(
#   ...
#   primary_keys=['customer_id', 'date'],
#   ...
# )

# To create a time series table, set the timeseries_columns argument

# customer_feature_table = fe.create_table(
#   ...
#   primary_keys=['customer_id', 'date'],
#   timeseries_columns='date',
#   ...
# )

增量实时表

注意

增量实时表上的表约束处于公共预览状态。 以下代码示例必须使用增量实时表预览频道运行。

可以使用具有主键约束的增量实时表作为特征表。 下面的代码演示如何使用主键创建增量实时表:

CREATE LIVE TABLE customer_features (
  customer_id int NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
) AS SELECT * FROM ...;

若要创建时序特征表,请将时间列添加为主键列,并指定 TIMESERIES 关键字。

CREATE LIVE TABLE customer_features (
  customer_id int NOT NULL,
  ts timestamp NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
) AS SELECT * FROM ...;

创建表后,可以像写入其他增量实时表一样将数据写入其中,可以将其用作特征表。

增量实时表的表约束仅在 SQL 中受支持。 若要为 Python 中声明的 Delta Live Tables 设置主键,请参阅使用由 Delta Live Tables 管道创建的现有流式处理表或具体化视图作为特征表

使用 Unity Catalog 中的现有 Delta 表作为特征表

Unity Catalog 中具有主键的任何 Delta 表都可以是 Unity Catalog 中的特征表,可以将特征 UI 和 API 与表配合使用。

注意

  • 只有表所有者才能声明主键约束。 所有者的姓名显示在目录资源管理器的表详细信息页面。
  • 验证 Unity 目录中的特征工程是否支持 Delta 表中的数据类型。 请参阅支持的数据类型
  • TIMESERIES 关键字需要 Databricks Runtime 13.3 LTS 或更高版本。

如果现有 Delta 表没有主键约束,则可以按如下所示创建一个:

  1. 将主键列设置为 NOT NULL。 对于每个主键列,请运行:

    ALTER TABLE <full_table_name> ALTER COLUMN <pk_col_name> SET NOT NULL
    
  2. 更改表以添加主键约束:

    ALTER TABLE <full_table_name> ADD CONSTRAINT <pk_name> PRIMARY KEY(pk_col1, pk_col2, ...)
    

    pk_name是主键约束的名称。 按照约定,可以使用带_pk后缀的表名(无架构和目录)。 例如,名称为"ml.recommender_system.customer_features"的表使用customer_features_pk作为其主键约束的名称。

    若要使表成为时序特征表,请在一个主键列指定 TIMESERIES 关键字,如下所示:

    ALTER TABLE <full_table_name> ADD CONSTRAINT <pk_name> PRIMARY KEY(pk_col1 TIMESERIES, pk_col2, ...)
    

    在表上添加主键约束后,该表将显示在特征 UI 中,并且可以将其用作特征表。

控制对 Unity Catalog 中特征表的访问

Unity Catalog 中特征表的访问控制由 Unity Catalog 管理。 请参阅Unity Catalog 特权

更新 Unity Catalog 中的特征表

可以添加新特征或基于主键修改特定的行,从而更新 Unity Catalog 中的特征表。

不应更新以下特征表元数据:

  • 主密钥。
  • 分区键。
  • 现有特征的名称或数据类型。

更改它们将导致将特征用于训练和服务模型的下游管道中断。

将新特征添加到 Unity Catalog 中的现有特征表

可通过以下两种方式之一将新特征添加到现有的特征表:

  • 更新现有的特征计算函数,并使用返回的数据帧作为参数来运行 write_table。 这会更新特征表架构,并基于主键合并新的特征值。
  • 创建新的特征计算函数来计算新的特征值。 此新计算函数返回的数据帧必须包含特征表的主键和分区键(如果已定义)。 使用数据帧作为参数来运行 write_table,以使用同一主键将新特征写入现有的特征表。

仅更新特征表中的特定行

write_table 中使用 mode = "merge"write_table 调用中发送的数据帧内不存在其主键的行将保持不变。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.write_table(
  name='ml.recommender_system.customer_features',
  df = customer_features_df,
  mode = 'merge'
)

计划一个作业来更新特征表

为确保特征表中的特征始终采用最新值,Databricks 建议创建一个作业,以便定期(例如每天)运行某个笔记本来更新特征表。 如果你已创建一个非计划作业,可将其转换为计划作业,以确保特征值始终是最新的。

用于更新特征表的代码使用 mode='merge',如以下示例所示。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()

customer_features_df = compute_customer_features(data)

fe.write_table(
  df=customer_features_df,
  name='ml.recommender_system.customer_features',
  mode='merge'
)

存储每日特征的既往值

定义一个带有复合主键的特征表。 在主键中包含日期。 例如,对于特征表 customer_features,可以使用复合主键 (date, customer_id) 和分区键 date 实现高效的读取。

Databricks sql

CREATE TABLE ml.recommender_system.customer_features (
  customer_id int NOT NULL,
  `date` date NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (`date`, customer_id)
)
PARTITIONED BY (`date`)
COMMENT "Customer features";

Python

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.create_table(
  name='ml.recommender_system.customer_features',
  primary_keys=['date', 'customer_id'],
  partition_columns=['date'],
  schema=customer_features_df.schema,
  description='Customer features'
)

然后,可以创建代码以从特征表中读取数据并将 date 筛选到所需的时间段。

还可以创建一个时序特征表,在使用 create_training_setscore_batch 时启用时间点查找。 请参阅在 Unity Catalog 中创建特征表

为使特征表保持最新,请设置按计划定期运行的作业以写入特征,或将新特征值流式传输到特征表。

创建流特征计算管道来更新特征

若要创建流式处理特征计算管道,请将流式处理 DataFrame 作为参数传递给 write_table。 此方法将返回 StreamingQuery 对象。

def compute_additional_customer_features(data):
  ''' Returns Streaming DataFrame
  '''
  pass

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()

customer_transactions = spark.readStream.load("dbfs:/events/customer_transactions")
stream_df = compute_additional_customer_features(customer_transactions)

fe.write_table(
  df=stream_df,
  name='ml.recommender_system.customer_features',
  mode='merge'
)

从 Unity Catalog 中的特征表读取

使用 read_table 读取特征值。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
customer_features_df = fe.read_table(
  name='ml.recommender_system.customer_features',
)

搜索和浏览 Unity Catalog 中的特征表

使用特征 UI 搜索或浏览 Unity Catalog 中的特征表。

  1. 点击侧边栏中的“特征存储图标”“特征”以显示特征 UI。

  2. 使用目录选择器选择目录以查看该目录中的所有可用特征表。 在搜索框中,输入特征表、特征或注释的完整或部分名称。 你也可以输入标记的全部或部分键或值。 搜索文本不区分大小写。

    特征搜索示例

获取 Unity Catalog 中特征表的元数据

使用get_table获取特征表元数据。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
ft = fe.get_table(name="ml.recommender_system.user_feature_table")
print(ft.features)

在 Unity Catalog 中将标记与特征表和特征配合使用

可以使用标记(简单的键值对)对特征表和特征进行分类和管理。

对于特征表,可以使用目录资源管理器 UI Databricks SQL特征工程 Python API 创建、 编辑和删除标记。

对于特征,可以使用目录资源管理器 UIDatabricks SQL 创建、编辑和删除标记。

以下示例演示如何使用特征工程 Python API 创建、更新和删除特征表标记。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()

# Create feature table with tags
customer_feature_table = fe.create_table(
  # ...
  tags={"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2", ...},
  # ...
)

# Upsert a tag
fe.set_feature_table_tag(name="customer_feature_table", key="tag_key_1", value="new_key_value")

# Delete a tag
fe.delete_feature_table_tag(name="customer_feature_table", key="tag_key_2")

删除 Unity Catalog 中的特征表

可以使用目录资源管理器或使用特征工程存储 Python API 直接删除 Unity Catalog 中的 Delta 表,从而删除 Unity Catalog 中的特征表。

注意

  • 删除特征表可能会导致上游创建者和下游使用者(模型、终结点和计划作业)出现意外故障。 必须在云提供商的配合下删除发布的联机存储。
  • 删除 Unity Catalog 中的特征表时,也会删除基础增量表。
  • Databricks Runtime 13.1 ML 或更低版本不支持drop_table。 使用 SQL 命令删除表。

可以使用 Databricks SQL 或 FeatureEngineeringClient.drop_table 删除 Unity Catalog 中的特征表:

Databricks sql

DROP TABLE ml.recommender_system.customer_features;

Python

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.drop_table(
  name='ml.recommender_system.customer_features'
)