使用功能训练模型

本文介绍如何使用 Unity Catalog 或本地工作区功能存储中的功能工程训练模型。 必须先创建训练数据集,该数据集定义要使用的特征以及如何联接它们。 然后,在训练模型时,该模型将保留对这些特征的引用。

使用 Unity Catalog 中的特征工程训练模型时,可以在目录资源管理器中查看模型的世系。 自动跟踪和显示用于创建模型的表和函数。 请参阅查看特征存储世系

使用模型进行推理时,可以选择让它从特征存储中检索特征值。特征存储模型也与 MLflow pyfunc 接口兼容,因此可以使用 MLflow 对特征表执行批量推理。

创建训练数据集

若要从特征表中选择特定的特征用于模型训练,可以使用 FeatureEngineeringClient.create_training_set(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient.create_training_set(适用于工作区特征存储)API 和名为 FeatureLookup 的对象创建训练数据集。 FeatureLookup 指定要在训练集中使用的每个特征,包括特征表的名称、特征的名称,以及在将特征表与传递给 create_training_set 的数据帧联接时要使用的键。 有关详细信息,请参阅特征查找

创建 FeatureLookup 时,请使用 feature_names 参数。 feature_names 取值为单个特征名称、特征名称列表,或取值 None,表示在创建训练集时查找特征表中的所有特征(不包括主键)。

注意

该数据帧中 lookup_key 列的类型和顺序必须与引用特征表的主键(时间戳键除外)的类型和顺序相匹配。

本文包含了上述两个语法版本的代码示例。

在此示例中,trainingSet.load_df 返回的数据帧包含 feature_lookups 中每个特征对应的一列。 它会保留提供给的 create_training_set 的数据帧的所有列,但使用 exclude_columns 排除的列除外。

Unity Catalog 中的特征工程

from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fe = FeatureEngineeringClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()

工作区特征存储

from databricks.feature_store import FeatureLookup, FeatureStoreClient

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()

当查找键与主键不匹配时创建训练集

FeatureLookup 中的参数 lookup_key 用于训练集中的列名。 create_training_set 使用创建特征表时指定主键的顺序,在 lookup_key 参数中指定的训练集内的列之间执行有序联接。

在此示例中,recommender_system.customer_features 具有以下主键:customer_iddt

recommender_system.product_features 特征表具有主键 product_id

如果 training_df 包含以下列:

  • cid
  • transaction_dt
  • product_id
  • rating

则以下代码将为 TrainingSet 创建正确的特征查找:

Unity Catalog 中的特征工程

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

工作区特征存储

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

调用 create_training_set 时,它会通过执行左联接创建一个训练数据集。该左联接操作使用对应于 (cid,transaction_dt) 的键 (customer_id,dt) 来联接 recommender_system.customer_featurestraining_df 表,如以下代码中所示:

Unity Catalog 中的特征工程

customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

工作区特征存储

customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

创建包含来自不同特征表的两个同名特征的训练集

FeatureLookup 中使用可选参数 output_name。 提供的名称用于取代 TrainingSet.load_df 返回的数据帧中的特征名称。 例如,在以下代码中,training_set.load_df 返回的数据帧包含 customer_heightproduct_height 列。

Unity Catalog 中的特征工程

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]

fe = FeatureEngineeringClient()

with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()

工作区特征存储

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]

fs = FeatureStoreClient()

with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()

多次使用相同特征创建训练集

若要使用由不同查找键联接的相同特征创建训练集,请使用多个 FeatureLookup。 为每个 FeatureLookup 输出使用唯一 output_name

Unity Catalog 中的特征工程

feature_lookups = [
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]

工作区特征存储

feature_lookups = [
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]

为非监督式机器学习模型创建一个训练集

在为非监督式学习模型创建训练集时设置 label=None。 例如,以下训练集可用于根据不同客户的兴趣将不同的客户分组:

Unity Catalog 中的特征工程

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]

fe = FeatureEngineeringClient()
with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )

  training_df = training_set.load_df()

工作区特征存储

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]

fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )

  training_df = training_set.load_df()

训练模型并使用特征表执行批量推理

使用特征存储中的特征训练模型时,该模型将保留对这些特征的引用。 将模型用于推理时,可以选择让该模型从特征存储中检索特征值。 必须提供模型中使用的特征的主键。 模型从工作区中的特征存储检索所需的特征。 然后,它会在评分期间根据需要联接特征值。

若要在推理时支持特征查找:

  • 必须使用 FeatureEngineeringClient(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient(适用于工作区特征存储)的 log_model 方法来记录模型。
  • 必须使用 TrainingSet.load_df 返回的数据帧训练模型。 如果在使用此数据帧训练模型之前以任何方式对其进行修改,在将该模型用于推理时将不会应用这些修改。 这会降低模型的性能。
  • 模型类型在 MLflow 中必须有对应的 python_flavor。 MLflow 支持大多数 Python 模型训练框架,包括:
    • scikit-learn
    • keras
    • PyTorch
    • SparkML
    • LightGBM
    • XGBoost
    • TensorFlow Keras(使用 python_flavormlflow.keras
  • 自定义的 MLflow pyfunc 模型

Unity Catalog 中的特征工程

# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fe = FeatureEngineeringClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fe = FeatureEngineeringClient()

# batch_df has columns 'customer_id' and 'product_id'
predictions = fe.score_batch(
    model_uri=model_uri,
    df=batch_df
)

# The 'predictions' DataFrame has these columns:
# 'customer_id', 'product_id', 'total_purchases_30d', 'category', 'prediction'

工作区特征存储

# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fs = FeatureStoreClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fs.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fs = FeatureStoreClient()

# batch_df has columns 'customer_id' and 'product_id'
predictions = fs.score_batch(
    model_uri=model_uri,
    df=batch_df
)

# The 'predictions' DataFrame has these columns:
# 'customer_id', 'product_id', 'total_purchases_30d', 'category', 'prediction'

对随特征元数据一起打包的模型评分时使用自定义特征值

默认情况下,随特征元数据一起打包的模型在推理时将从特征表中查找特征。 若要使用自定义特征值进行评分,请在传递给 FeatureEngineeringClient.score_batch(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient.score_batch(适用于工作区特征存储)的数据帧中包含它们。

例如,假设你要将模型随以下两个特征一起打包:

Unity Catalog 中的特征工程

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]

工作区特征存储

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]

在推理时,可以通过对包含 account_creation_date 列的数据帧调用 score_batch,来为特征 account_creation_date 提供自定义值。 在这种情况下,API 只会在特征存储中查找 num_lifetime_purchases 特征,并使用提供的自定义 account_creation_date 列值进行模型评分。

Unity Catalog 中的特征工程

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)

工作区特征存储

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)

结合使用特征存储特征和驻留在特征存储外部的数据对模型进行训练和评分

可以结合使用特征存储特征和特征存储外部的数据对模型进行训练和评分。 将模型随特征元数据一起打包时,该模型将从特征存储中检索特征值用于推理。

若要训练模型,请将额外数据作为列包含在传递给 FeatureEngineeringClient.create_training_set(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient.create_training_set(适用于工作区特征存储)的数据帧中。 此示例使用特征存储中的特征 total_purchases_30d 以及外部列 browser

Unity Catalog 中的特征工程

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]

fe = FeatureEngineeringClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)

工作区特征存储

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)

在推理时,FeatureStoreClient.score_batch 中使用的数据帧必须包含 browser 列。

Unity Catalog 中的特征工程

# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
  model_uri=model_uri,
  df=batch_df
)

工作区特征存储

# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri=model_uri,
  df=batch_df
)

使用 MLflow 加载模型并执行批量推理

使用 FeatureEngineeringClient(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient(适用于工作区特征存储)的 log_model 方法来记录模型日志后,可以在推理时使用 MLflow。 MLflow.pyfunc.predict 从特征存储中检索特征值,并联接在推理时提供的任何值。 必须提供模型中使用的特征的主键。

注意

使用 MLflow 进行批量推理需要 MLflow 2.11 及更高版本。

# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
  FeatureLookup(
    table_name='ml.recommender_system.customer_features',
    feature_names=['total_purchases_30d'],
    lookup_key='customer_id',
  ),
  FeatureLookup(
    table_name='ml.recommender_system.product_features',
    feature_names=['category'],
    lookup_key='product_id'
  )
]

fe = FeatureEngineeringClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model",
    #refers to the default value of "result_type" if not provided at inference
    params={"result_type":"double"},
  )

# Batch inference with MLflow

# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.

# batch_df has columns 'customer_id' and 'product_id'
model = mlflow.pyfunc.load_model(model_version_uri)

# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})

# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)