使用 xgboost.spark 对 XGBoost 模型进行分布式训练

重要

此功能目前以公共预览版提供。

Python 包 xgboost>=1.7 包含新模块 xgboost.spark。 此模块包含 xgboost PySpark 估算器 xgboost.spark.SparkXGBRegressorxgboost.spark.SparkXGBClassifierxgboost.spark.SparkXGBRanker。 这些新类支持在 SparkML 管道中包含 XGBoost 估算器。 有关 API 详细信息,请参阅 XGBoost Python Spark API 文档

要求

Databricks Runtime 12.0 ML 和更高版本。

xgboost.spark 参数

xgboost.spark 模块中定义的估算器支持标准 XGBoost 中使用的大多数相同参数和自变量。

  • 类构造函数、fit 方法和 predict 方法的参数与 xgboost.sklearn 模块中的参数大体相同。
  • 命名、值和默认值与 XGBoost 参数中所述基本相同。
  • 例外的情况是有几个参数不受支持(例如 gpu_idnthreadsample_weighteval_set),并为 pyspark 估算器添加了特定的参数(例如 featuresCollabelColuse_gpuvalidationIndicatorCol)。 有关详细信息,请参阅 XGBoost Python Spark API 文档

分布式训练

xgboost.spark 模块中定义的 PySpark 估算器支持使用 num_workers 参数进行分布式 XGBoost 训练。 若要使用分布式训练,请创建分类器或回归器,并将 num_workers 设置为分布式训练期间并发运行的 Spark 任务数。 若要使用所有 Spark 任务槽,请设置 num_workers=sc.defaultParallelism

例如:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

注意

  • 不能将 mlflow.xgboost.autolog 与分布式 XGBoost 一起使用。 若要使用 MLflow 记录 xgboost Spark 模型,请使用 mlflow.spark.log_model(spark_xgb_model, artifact_path)
  • 不能在启用了自动缩放的群集上使用分布式 XGBoost。 以这种弹性缩放范式启动的新工作器节点无法接收新任务集,并会保持空闲状态。 有关禁用自动缩放的说明,请参阅启用自动缩放

为稀疏特征数据集训练启用优化

xgboost.spark 模块中定义的 PySpark 估算器支持对包含稀疏特征的数据集的训练进行优化。 若要启用稀疏特征集的优化,需要为 fit 方法提供一个数据集(其中包含由 pyspark.ml.linalg.SparseVector 类型的值组成的特征列),并将估算器参数 enable_sparse_data_optim 设置为 True。 此外,需要将 missing 参数设置为 0.0

例如:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

GPU 训练

xgboost.spark 模块中定义的 PySpark 估算器支持在 GPU 上进行训练。 将参数 use_gpu 设置为 True 即可启用 GPU 训练。

注意

对于 XGBoost 分布式训练中使用的每个 Spark 任务,当 use_gpu 自变量设置为 True 时,训练中只会使用一个 GPU。 Databricks 建议对 Spark 群集配置 spark.task.resource.gpu.amount 使用默认值 1。 否则,分配到此 Spark 任务的其他 GPU 将处于空闲状态。

例如:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

故障排除

在多节点训练期间,如果遇到 NCCL failure: remote process exited or there was a network error 消息,则通常表示 GPU 之间的网络通信存在问题。 当 NCCL(NVIDIA 集体通信库)无法使用某些网络接口进行 GPU 通信时,就会出现此问题。

若要解决此问题,请将群集的 sparkConf 从 spark.executorEnv.NCCL_SOCKET_IFNAME 设置为 eth。 这实际上是将节点中所有工作器的环境变量 NCCL_SOCKET_IFNAME 设置为 eth

示例笔记本

此笔记本演示如何将 Python 包 xgboost.spark 与 Spark MLlib 配合使用。

PySpark-XGBoost 笔记本

获取笔记本

已弃用的 sparkdl.xgboost 模块的迁移指南

  • from sparkdl.xgboost import XgboostRegressor 替换为 from xgboost.spark import SparkXGBRegressor,将 from sparkdl.xgboost import XgboostClassifier 替换为 from xgboost.spark import SparkXGBClassifier
  • 将估算器构造函数中的所有参数名称从 camelCase 样式更改为 snake_case 样式。 例如,将 XgboostRegressor(featuresCol=XXX) 更改为 SparkXGBRegressor(features_col=XXX)
  • 已删除参数 use_external_storageexternal_storage_precisionxgboost.spark 估算器通过 DMatrix 数据迭代 API 来更有效地使用内存。 不再需要使用低效的外部存储模式。 对于极大的数据集,Databricks 建议增大 num_workers 参数,使每个训练任务将数据划分为更小且更易于管理的数据分区。 考虑设置 num_workers = sc.defaultParallelism,它将 num_workers 设置为群集中 Spark 任务槽的总数。
  • 对于 xgboost.spark 中定义的估算器,设置 num_workers=1 会使用单个 Spark 任务执行模型训练。 这可以利用 Spark 群集配置设置 spark.task.cpus 指定的 CPU 核心数(默认设置为 1)。 若要使用更多 CPU 核心来训练模型,请增大 num_workersspark.task.cpus。 不能为 xgboost.spark 中定义的估算器设置 nthreadn_jobs 参数。 此行为不同于已弃用的 sparkdl.xgboost 包中定义的估算器的先前行为。

sparkdl.xgboost 模型转换为 xgboost.spark 模型

sparkdl.xgboost 模型具有不同于 xgboost.spark 模型的保存格式,并且具有不同的参数设置。 使用以下实用工具函数来转换模型:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

如果你有一个包含 sparkdl.xgboost 模型作为最后一个阶段的 pyspark.ml.PipelineModel 模型,则可以将 sparkdl.xgboost 模型的阶段替换为转换后的 xgboost.spark 模型。

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)