将特征发布到联机特征存储

本文介绍如何将特征发布到联机特征存储以提供实时服务。

Databricks 特征存储支持以下联机存储:

联机存储提供者 使用 Unity Catalog 中的特征工程进行发布 使用工作区特征存储进行发布 旧版 MLflow 模型服务中的特征查找 模型服务中的特征查找
Azure Cosmos DB [1] X X(特征存储客户端 v0.5.0 及更高版本) X X
Azure MySQL(单一服务器) X X
Azure SQL Server X

Cosmos DB 兼容性说明

本部分包含将 Databricks 特征存储与 Cosmos DB 配合使用时要记住的一些重要事项。

已启用 Unity Catalog 的工作区

在 Databricks Runtime 12.2 LTS ML 及更低版本中,Cosmos DB 在线存储提供程序与已启用 Unity Catalog 的工作区不兼容。 Unity Catalog 和官方 Cosmos DB Spark 连接器都会修改 Spark 目录。 在运行 Databricks Runtime 12.2 LTS ML 或更低版本的群集上从已启用 Unity Catalog 的工作区将特征发布到 Cosmos DB 时,可能存在写入冲突,导致特征存储发布到 Cosmos DB 失败。

若要在已启用 Unity Catalog 的工作区中使用 Cosmos DB,你必须使用运行 Databricks Runtime 13.0 ML 或更高版本的群集,或者使用运行 Databricks Runtime 11.3 LTS ML 或更高版本且群集策略为“不受限”或“共享计算”的群集。

Spark 连接器

若要使用 Azure Cosmos DB,必须使用 Core (SQL) API 创建帐户,并且网络连接方法必须设置为“所有网络”。 群集上必须安装相应的适用于 SQL API 的 Azure Cosmos DB Spark 3 OLTP 连接器。 Databricks 建议安装适用于 Spark 3.2 的最新连接器版本,直到发布适用于 Spark 3.3 的连接器。

请勿手动创建数据库或容器 - 使用 publish_table()

Cosmos DB 联机存储使用与脱机存储不同的架构。 具体来说,在联机存储中,主键作为组合键存储在 _feature_store_internal__primary_keys 列中。

为确保特征存储可以访问 Cosmos DB 联机存储,你必须使用 publish_table() 在联机存储中创建表。 请勿在 Cosmos DB 中手动创建数据库或容器。 publish_table() 自动为你执行该操作。

将批量计算的特征发布到联机存储

可以创建并计划 Azure Databricks 作业以定期发布更新的特征。 此作业还可以包含用于计算更新特征的代码,或者你可以创建并运行单独的作业来计算和发布特征更新。

对于 SQL 存储,以下代码假设名为“recommender_system”的联机数据库已存在于联机存储中,并与脱机存储的名称相匹配。 如果数据库中没有名为“customer_features”的表,此代码将创建此表。 此代码还假设每天计算特征并将其存储为分区列 _dt

以下代码假设你已创建用于访问此联机存储的机密。

Cosmos DB

Cosmos DB 的支持适用于 Unity Catalog 客户端的所有特征工程版本,以及特征存储客户端 v0.5.0 及更高版本。

import datetime
from databricks.feature_engineering.online_store_spec import AzureCosmosDBSpec
# or databricks.feature_store.online_store_spec for Workspace Feature Store
online_store = AzureCosmosDBSpec(
  account_uri='<account-uri>',
  read_secret_prefix='<read-scope>/<prefix>',
  write_secret_prefix='<write-scope>/<prefix>'
)

fe.publish_table( # or fs.publish_table for Workspace Feature Store
  name='ml.recommender_system.customer_features',
  online_store=online_store,
  filter_condition=f"_dt = '{str(datetime.date.today())}'",
  mode='merge'
)

SQL 存储

import datetime
from databricks.feature_engineering.online_store_spec import AzureMySqlSpec
# or databricks.feature_store.online_store_spec for Workspace Feature Store
online_store = AzureMySqlSpec(
  hostname='<hostname>',
  port='<port>',
  read_secret_prefix='<read-scope>/<prefix>',
  write_secret_prefix='<write-scope>/<prefix>'
)

fs.publish_table(
  name='recommender_system.customer_features',
  online_store=online_store,
  filter_condition=f"_dt = '{str(datetime.date.today())}'",
  mode='merge'
)

将流特征发布到联机存储

若要持续将特征流式传输到联机存储,请设置 streaming=True

fe.publish_table( # or fs.publish_table for Workspace Feature Store
  name='ml.recommender_system.customer_features',
  online_store=online_store,
  streaming=True
)

将选定的特征发布到联机存储

若要仅将选定的特征发布到联机存储,请使用 features 参数指定要发布的特征名称。 始终会发布主键和时间戳键。 如果你不指定 features 参数,或者值为 None,则会发布脱机特征表中的所有特征。

注意

即使仅将一部分功能发布到在线商店,整个脱机表也必须是有效的功能表。 如果脱机表包含不支持的数据类型,则无法将一部分功能从该表发布到在线商店。

fe.publish_table( # or fs.publish_table for Workspace Feature Store
  name='ml.recommender_system.customer_features',
  online_store=online_store,
  features=["total_purchases_30d"]
)

将特征表发布到特定的数据库

联机存储规范中,指定数据库名称 (database_name) 和表名称 (table_name)。 如果你不指定这些参数,则会使用脱机数据库名称和特征表名称。 database_name 必须已存在于联机存储中。

online_store = AzureMySqlSpec(
  hostname='<hostname>',
  port='<port>',
  database_name='<database-name>',
  table_name='<table-name>',
  read_secret_prefix='<read-scope>/<prefix>',
  write_secret_prefix='<write-scope>/<prefix>'
)

覆盖现有的联机特征表或特定行

publish_table 调用中使用 mode='overwrite'。 联机表将由脱机表中的数据完全覆盖。

注意

Azure Cosmos DB 不支持覆盖模式。

fs.publish_table(
  name='recommender_system.customer_features',
  online_store=online_store,
  mode='overwrite'
)

若要仅覆盖特定的行,请使用 filter_condition 参数:

fs.publish_table(
  name='recommender_system.customer_features',
  online_store=online_store,
  filter_condition=f"_dt = '{str(datetime.date.today())}'",
  mode='merge'
)

从联机存储中删除已发布的表

在特征存储客户端 v0.12.0 及更高版本中,可以使用 drop_online_table 从联机存储中删除已发布的表。 使用 drop_online_table 删除已发布的表时,将从联机存储提供程序中删除该表,并从 Databricks 中删除联机存储元数据。

fe.drop_online_table( # or fs.drop_online_table for Workspace Feature Store
  name='recommender_system.customer_features',
  online_store = online_store
)

注意

  • drop_online_table 从联机存储中删除已发布的表。 它不会删除 Databricks 中的特征表。
  • 在删除已发布的表之前,应确保该表未用于模型服务特征查找,并且没有其他下游依赖项。 删除操作不可逆,可能会导致依赖项失败。
  • 若要检查任何依赖项,请在执行 drop_online_table 之前的一天,考虑为你要删除的已发布表轮换密钥。