使用 xgboost.spark
对 XGBoost 模型进行分布式训练
重要
此功能目前以公共预览版提供。
Python 包 xgboost>=1.7 包含新模块 xgboost.spark
。 此模块包含 xgboost PySpark 估算器 xgboost.spark.SparkXGBRegressor
、xgboost.spark.SparkXGBClassifier
和 xgboost.spark.SparkXGBRanker
。 这些新类支持在 SparkML 管道中包含 XGBoost 估算器。 有关 API 详细信息,请参阅 XGBoost Python Spark API 文档。
要求
Databricks Runtime 12.0 ML 和更高版本。
xgboost.spark
参数
xgboost.spark
模块中定义的估算器支持标准 XGBoost 中使用的大多数相同参数和自变量。
- 类构造函数、
fit
方法和predict
方法的参数与xgboost.sklearn
模块中的参数大体相同。 - 命名、值和默认值与 XGBoost 参数中所述基本相同。
- 例外的情况是有几个参数不受支持(例如
gpu_id
、nthread
、sample_weight
、eval_set
),并为pyspark
估算器添加了特定的参数(例如featuresCol
、labelCol
、use_gpu
、validationIndicatorCol
)。 有关详细信息,请参阅 XGBoost Python Spark API 文档。
分布式训练
xgboost.spark
模块中定义的 PySpark 估算器支持使用 num_workers
参数进行分布式 XGBoost 训练。 若要使用分布式训练,请创建分类器或回归器,并将 num_workers
设置为分布式训练期间并发运行的 Spark 任务数。 若要使用所有 Spark 任务槽,请设置 num_workers=sc.defaultParallelism
。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
注意
- 不能将
mlflow.xgboost.autolog
与分布式 XGBoost 一起使用。 若要使用 MLflow 记录 xgboost Spark 模型,请使用mlflow.spark.log_model(spark_xgb_model, artifact_path)
。 - 不能在启用了自动缩放的群集上使用分布式 XGBoost。 以这种弹性缩放范式启动的新工作器节点无法接收新任务集,并会保持空闲状态。 有关禁用自动缩放的说明,请参阅启用自动缩放。
为稀疏特征数据集训练启用优化
xgboost.spark
模块中定义的 PySpark 估算器支持对包含稀疏特征的数据集的训练进行优化。
若要启用稀疏特征集的优化,需要为 fit
方法提供一个数据集(其中包含由 pyspark.ml.linalg.SparseVector
类型的值组成的特征列),并将估算器参数 enable_sparse_data_optim
设置为 True
。 此外,需要将 missing
参数设置为 0.0
。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU 训练
xgboost.spark
模块中定义的 PySpark 估算器支持在 GPU 上进行训练。 将参数 use_gpu
设置为 True
即可启用 GPU 训练。
注意
对于 XGBoost 分布式训练中使用的每个 Spark 任务,当 use_gpu
自变量设置为 True
时,训练中只会使用一个 GPU。 Databricks 建议对 Spark 群集配置 spark.task.resource.gpu.amount
使用默认值 1
。 否则,分配到此 Spark 任务的其他 GPU 将处于空闲状态。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
故障排除
在多节点训练期间,如果遇到 NCCL failure: remote process exited or there was a network error
消息,则通常表示 GPU 之间的网络通信存在问题。 当 NCCL(NVIDIA 集体通信库)无法使用某些网络接口进行 GPU 通信时,就会出现此问题。
若要解决此问题,请将群集的 sparkConf 从 spark.executorEnv.NCCL_SOCKET_IFNAME
设置为 eth
。 这实际上是将节点中所有工作器的环境变量 NCCL_SOCKET_IFNAME
设置为 eth
。
示例笔记本
此笔记本演示如何将 Python 包 xgboost.spark
与 Spark MLlib 配合使用。
PySpark-XGBoost 笔记本
已弃用的 sparkdl.xgboost
模块的迁移指南
- 将
from sparkdl.xgboost import XgboostRegressor
替换为from xgboost.spark import SparkXGBRegressor
,将from sparkdl.xgboost import XgboostClassifier
替换为from xgboost.spark import SparkXGBClassifier
。 - 将估算器构造函数中的所有参数名称从 camelCase 样式更改为 snake_case 样式。 例如,将
XgboostRegressor(featuresCol=XXX)
更改为SparkXGBRegressor(features_col=XXX)
。 - 已删除参数
use_external_storage
和external_storage_precision
。xgboost.spark
估算器通过 DMatrix 数据迭代 API 来更有效地使用内存。 不再需要使用低效的外部存储模式。 对于极大的数据集,Databricks 建议增大num_workers
参数,使每个训练任务将数据划分为更小且更易于管理的数据分区。 考虑设置num_workers = sc.defaultParallelism
,它将num_workers
设置为群集中 Spark 任务槽的总数。 - 对于
xgboost.spark
中定义的估算器,设置num_workers=1
会使用单个 Spark 任务执行模型训练。 这可以利用 Spark 群集配置设置spark.task.cpus
指定的 CPU 核心数(默认设置为 1)。 若要使用更多 CPU 核心来训练模型,请增大num_workers
或spark.task.cpus
。 不能为xgboost.spark
中定义的估算器设置nthread
或n_jobs
参数。 此行为不同于已弃用的sparkdl.xgboost
包中定义的估算器的先前行为。
将 sparkdl.xgboost
模型转换为 xgboost.spark
模型
sparkdl.xgboost
模型具有不同于 xgboost.spark
模型的保存格式,并且具有不同的参数设置。 使用以下实用工具函数来转换模型:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
如果你有一个包含 sparkdl.xgboost
模型作为最后一个阶段的 pyspark.ml.PipelineModel
模型,则可以将 sparkdl.xgboost
模型的阶段替换为转换后的 xgboost.spark
模型。
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)