使用功能训练模型
本文介绍如何使用 Unity Catalog 或本地工作区功能存储中的功能工程训练模型。 必须先创建训练数据集,该数据集定义要使用的特征以及如何联接它们。 然后,在训练模型时,该模型将保留对这些特征的引用。
使用 Unity Catalog 中的特征工程训练模型时,可以在目录资源管理器中查看模型的世系。 自动跟踪和显示用于创建模型的表和函数。 请参阅特征治理和世系。
将模型用于推理时,可以选择让该模型从特征存储中检索特征值。 特征存储模型还与 MLflow pyfunc 接口兼容,因此可以使用 MLflow 对特征表执行批量推理。
一个模型最多可以使用 50 个表和 100 个函数进行训练。
创建训练数据集
若要从特征表中选择特定的特征用于模型训练,可以使用 FeatureEngineeringClient.create_training_set
(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient.create_training_set
(适用于工作区特征存储)API 和名为 FeatureLookup
的对象创建训练数据集。 FeatureLookup
指定要在训练集中使用的每个特征,包括特征表的名称、特征的名称,以及在将特征表与传递给 create_training_set
的数据帧联接时要使用的键。 有关详细信息,请参阅特征查找。
创建 FeatureLookup
时,请使用 feature_names
参数。
feature_names
取值为单个特征名称、特征名称列表,或取值 None,表示在创建训练集时查找特征表中的所有特征(不包括主键)。
注意
该数据帧中 lookup_key
列的类型和顺序必须与引用特征表的主键(时间戳键除外)的类型和顺序相匹配。
本文包含了上述两个语法版本的代码示例。
在此示例中,trainingSet.load_df
返回的数据帧包含 feature_lookups
中每个特征对应的一列。 它会保留提供给的 create_training_set
的数据帧的所有列,但使用 exclude_columns
排除的列除外。
Unity Catalog 中的特征工程
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
工作区特征存储
from databricks.feature_store import FeatureLookup, FeatureStoreClient
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fs = FeatureStoreClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
当查找键与主键不匹配时创建训练集
将 FeatureLookup
中的参数 lookup_key
用于训练集中的列名。 create_training_set
使用创建特征表时指定主键的顺序,在 lookup_key
参数中指定的训练集内的列之间执行有序联接。
在此示例中,recommender_system.customer_features
具有以下主键:customer_id
、dt
。
recommender_system.product_features
特征表具有主键 product_id
。
如果 training_df
包含以下列:
cid
transaction_dt
product_id
rating
则以下代码将为 TrainingSet
创建正确的特征查找:
Unity Catalog 中的特征工程
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key=['cid', 'transaction_dt']
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
工作区特征存储
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key=['cid', 'transaction_dt']
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
调用 create_training_set
时,它会通过执行左联接创建一个训练数据集。该左联接操作使用对应于 (cid
,transaction_dt
) 的键 (customer_id
,dt
) 来联接 recommender_system.customer_features
和 training_df
表,如以下代码中所示:
Unity Catalog 中的特征工程
customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
工作区特征存储
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
创建包含来自不同特征表的两个同名特征的训练集
在 FeatureLookup
中使用可选参数 output_name
。 提供的名称用于取代 TrainingSet.load_df
返回的数据帧中的特征名称。 例如,在以下代码中,training_set.load_df
返回的数据帧包含 customer_height
和 product_height
列。
Unity Catalog 中的特征工程
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['height'],
lookup_key='customer_id',
output_name='customer_height',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['height'],
lookup_key='product_id',
output_name='product_height'
),
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
工作区特征存储
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['height'],
lookup_key='customer_id',
output_name='customer_height',
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['height'],
lookup_key='product_id',
output_name='product_height'
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
多次使用相同特征创建训练集
若要使用由不同查找键联接的相同特征创建训练集,请使用多个 FeatureLookup。
为每个 FeatureLookup 输出使用唯一 output_name
。
Unity Catalog 中的特征工程
feature_lookups = [
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['pickup_zip'],
output_name='pickup_temp'
),
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['dropoff_zip'],
output_name='dropoff_temp'
)
]
工作区特征存储
feature_lookups = [
FeatureLookup(
table_name='taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['pickup_zip'],
output_name='pickup_temp'
),
FeatureLookup(
table_name='taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['dropoff_zip'],
output_name='dropoff_temp'
)
]
为非监督式机器学习模型创建一个训练集
在为非监督式学习模型创建训练集时设置 label=None
。 例如,以下训练集可用于根据不同客户的兴趣将不同的客户分组:
Unity Catalog 中的特征工程
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
工作区特征存储
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
使用视图作为特征表时创建 TrainingSet
若要将视图用作特征表,必须使用 databricks-feature-engineering
版本 0.7.0 或更高版本,其内置于 Databricks Runtime 16.0 ML 中。
该视图必须是来自源 Delta 表的简单 SELECT 视图。 简单 SELECT 视图定义为从 Unity Catalog 中的单个 Delta 表创建的视图,可用作特征表,并且其主键是在没有 JOIN、GROUP BY 或 DISTINCT 子句的情况下选择的。 SQL 语句中可接受的关键字是 SELECT、FROM、WHERE、ORDER BY、LIMIT 和 OFFSET。
在以下示例中,ml.recommender_system.customer_table
有主键 cid
和 dt
,其中 dt
是时序列。 该示例假定数据帧 training_df
具有列 cid
、dt
和 label
:
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
customer_features_df = spark.sql("CREATE OR REPLACE VIEW ml.recommender_system.customer_features AS SELECT cid, dt, pid, rating FROM ml.recommender_system.customer_table WHERE rating > 3")
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['pid', 'rating'],
lookup_key=['cid'],
timestamp_lookup_key='dt'
),
]
fe = FeatureEngineeringClient()
training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='label'
)
training_df = training_set.load_df()
训练模型并使用特征表执行批量推理
使用特征存储中的特征训练模型时,该模型将保留对这些特征的引用。 将模型用于推理时,可以选择让该模型从特征存储中检索特征值。 必须提供模型中使用的特征的主键。 模型从工作区中的特征存储检索所需的特征。 然后,它会在评分期间根据需要联接特征值。
若要在推理时支持特征查找:
- 必须使用
FeatureEngineeringClient
(适用于 Unity Catalog 中的特征工程)或FeatureStoreClient
(适用于工作区特征存储)的log_model
方法来记录模型。 - 必须使用
TrainingSet.load_df
返回的数据帧训练模型。 如果在使用此数据帧训练模型之前以任何方式对其进行修改,在将该模型用于推理时将不会应用这些修改。 这会降低模型的性能。 - 模型类型在 MLflow 中必须有对应的
python_flavor
。 MLflow 支持大多数 Python 模型训练框架,包括:- scikit-learn
- keras
- PyTorch
- SparkML
- LightGBM
- XGBoost
- TensorFlow Keras(使用
python_flavor
mlflow.keras
)
- 自定义的 MLflow pyfunc 模型
Unity Catalog 中的特征工程
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fe = FeatureEngineeringClient()
# batch_df has columns 'customer_id' and 'product_id'
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)
# The 'predictions' DataFrame has these columns:
# 'customer_id', 'product_id', 'total_purchases_30d', 'category', 'prediction'
工作区特征存储
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fs = FeatureStoreClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fs.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fs = FeatureStoreClient()
# batch_df has columns 'customer_id' and 'product_id'
predictions = fs.score_batch(
model_uri=model_uri,
df=batch_df
)
# The 'predictions' DataFrame has these columns:
# 'customer_id', 'product_id', 'total_purchases_30d', 'category', 'prediction'
对随特征元数据一起打包的模型评分时使用自定义特征值
默认情况下,随特征元数据一起打包的模型在推理时将从特征表中查找特征。 若要使用自定义特征值进行评分,请在传递给 FeatureEngineeringClient.score_batch
(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient.score_batch
(适用于工作区特征存储)的数据帧中包含它们。
例如,假设你要将模型随以下两个特征一起打包:
Unity Catalog 中的特征工程
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['account_creation_date', 'num_lifetime_purchases'],
lookup_key='customer_id',
),
]
工作区特征存储
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['account_creation_date', 'num_lifetime_purchases'],
lookup_key='customer_id',
),
]
在推理时,可以通过对包含 account_creation_date
列的数据帧调用 score_batch
,来为特征 account_creation_date
提供自定义值。 在这种情况下,API 只会在特征存储中查找 num_lifetime_purchases
特征,并使用提供的自定义 account_creation_date
列值进行模型评分。
Unity Catalog 中的特征工程
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
model_uri='models:/ban_prediction_model/1',
df=batch_df
)
工作区特征存储
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
model_uri='models:/ban_prediction_model/1',
df=batch_df
)
结合使用特征存储特征和驻留在特征存储外部的数据对模型进行训练和评分
可以结合使用特征存储特征和特征存储外部的数据对模型进行训练和评分。 将模型随特征元数据一起打包时,该模型将从特征存储中检索特征值用于推理。
若要训练模型,请将额外数据作为列包含在传递给 FeatureEngineeringClient.create_training_set
(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient.create_training_set
(适用于工作区特征存储)的数据帧中。 此示例使用特征存储中的特征 total_purchases_30d
以及外部列 browser
。
Unity Catalog 中的特征工程
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
]
fe = FeatureEngineeringClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id'] # 'browser' is not excluded
)
工作区特征存储
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
]
fs = FeatureStoreClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id'] # 'browser' is not excluded
)
在推理时,FeatureStoreClient.score_batch
中使用的数据帧必须包含 browser
列。
Unity Catalog 中的特征工程
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)
工作区特征存储
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
model_uri=model_uri,
df=batch_df
)
使用 MLflow 加载模型并执行批量推理
使用 FeatureEngineeringClient
(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient
(适用于工作区特征存储)的 log_model
方法来记录模型日志后,可以在推理时使用 MLflow。 MLflow.pyfunc.predict
从特征存储中检索特征值,并联接在推理时提供的任何值。 必须提供模型中使用的特征的主键。
注意
使用 MLflow 进行批量推理需要 MLflow 2.11 及更高版本。 使用时序特征表的模型不受支持。 若要使用时序特征表进行批量推理,请使用 score_batch
。 请参阅训练模型并使用特征表执行批量推理。
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model",
#refers to the default value of "result_type" if not provided at inference
params={"result_type":"double"},
)
# Batch inference with MLflow
# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.
# batch_df has columns 'customer_id' and 'product_id'
model = mlflow.pyfunc.load_model(model_version_uri)
# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})
# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)
处理缺失特征值
将不存在的查找键传递给模型进行预测时,FeatureLookup
提取的特征值可能是 None
或 NaN
,具体取决于环境。 模型实现应能够处理这两个值。
对于使用
fe.score_batch
的脱机应用程序,缺失特征的返回值为NaN
。
若要在使用按需功能时处理缺失特征值,请参阅如何处理缺失特征值。