使用
重要
此功能目前以公共预览版提供。
Python 包 xgboost>=1.7 包含新模块 xgboost.spark。 此模块包含 xgboost PySpark 估算器 xgboost.spark.SparkXGBRegressor、xgboost.spark.SparkXGBClassifier 和 xgboost.spark.SparkXGBRanker。 这些新类支持在 SparkML 管道中包含 XGBoost 估算器。 有关 API 详细信息,请参阅 XGBoost Python Spark API 文档。
要求
Databricks Runtime 12.0 ML 和更高版本。
xgboost.spark 参数
xgboost.spark 模块中定义的估算器支持标准 XGBoost 中使用的大多数相同参数和自变量。
- 类构造函数、
fit方法和predict方法的参数与xgboost.sklearn模块中的参数大体相同。 - 命名、值和默认值与 XGBoost 参数中所述基本相同。
- 例外的情况是有几个参数不受支持(例如
gpu_id、nthread、sample_weight、eval_set),并为pyspark估算器添加了特定的参数(例如featuresCol、labelCol、use_gpu、validationIndicatorCol)。 有关详细信息,请参阅 XGBoost Python Spark API 文档。
分布式训练
xgboost.spark 模块中定义的 PySpark 估算器支持使用 num_workers 参数进行分布式 XGBoost 训练。 若要使用分布式训练,请创建分类器或回归器,并将 num_workers 设置为分布式训练期间并发运行的 Spark 任务数。 若要使用所有 Spark 任务槽,请设置 num_workers=sc.defaultParallelism。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
注意
- 不能将
mlflow.xgboost.autolog与分布式 XGBoost 一起使用。 若要使用 MLflow 记录 xgboost Spark 模型,请使用mlflow.spark.log_model(spark_xgb_model, artifact_path)。 - 不能在启用了自动缩放的群集上使用分布式 XGBoost。 以这种弹性缩放范式启动的新工作器节点无法接收新任务集,并会保持空闲状态。 有关禁用自动缩放的说明,请参阅启用自动缩放。
为稀疏特征数据集训练启用优化
xgboost.spark 模块中定义的 PySpark 估算器支持对包含稀疏特征的数据集的训练进行优化。
若要启用稀疏特征集的优化,需要为 fit 方法提供一个数据集(其中包含由 pyspark.ml.linalg.SparseVector 类型的值组成的特征列),并将估算器参数 enable_sparse_data_optim 设置为 True。 此外,需要将 missing 参数设置为 0.0。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU 训练
xgboost.spark 模块中定义的 PySpark 估算器支持在 GPU 上进行训练。 将参数 use_gpu 设置为 True 即可启用 GPU 训练。
注意
对于 XGBoost 分布式训练中使用的每个 Spark 任务,当 use_gpu 自变量设置为 True 时,训练中只会使用一个 GPU。 Databricks 建议对 Spark 群集配置 1 使用默认值 spark.task.resource.gpu.amount。 否则,分配到此 Spark 任务的其他 GPU 将处于空闲状态。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
故障排除
在多节点训练期间,如果遇到 NCCL failure: remote process exited or there was a network error 消息,则通常表示 GPU 之间的网络通信存在问题。 当 NCCL(NVIDIA 集体通信库)无法使用某些网络接口进行 GPU 通信时,就会出现此问题。
若要解决此问题,请将群集的 sparkConf 从 spark.executorEnv.NCCL_SOCKET_IFNAME 设置为 eth。 这实际上是将节点中所有工作器的环境变量 NCCL_SOCKET_IFNAME 设置为 eth。
示例笔记本
此笔记本演示如何将 Python 包 xgboost.spark 与 Spark MLlib 配合使用。
PySpark-XGBoost 笔记本
已弃用的 sparkdl.xgboost 模块的迁移指南
- 将
from sparkdl.xgboost import XgboostRegressor替换为from xgboost.spark import SparkXGBRegressor,将from sparkdl.xgboost import XgboostClassifier替换为from xgboost.spark import SparkXGBClassifier。 - 将估算器构造函数中的所有参数名称从 camelCase 样式更改为 snake_case 样式。 例如,将
XgboostRegressor(featuresCol=XXX)更改为SparkXGBRegressor(features_col=XXX)。 - 已删除参数
use_external_storage和external_storage_precision。xgboost.spark估算器通过 DMatrix 数据迭代 API 来更有效地使用内存。 不再需要使用低效的外部存储模式。 对于极大的数据集,Databricks 建议增大num_workers参数,使每个训练任务将数据划分为更小且更易于管理的数据分区。 考虑设置num_workers = sc.defaultParallelism,它将num_workers设置为群集中 Spark 任务槽的总数。 - 对于
xgboost.spark中定义的估算器,设置num_workers=1会使用单个 Spark 任务执行模型训练。 这可以利用 Spark 群集配置设置spark.task.cpus指定的 CPU 核心数(默认设置为 1)。 若要使用更多 CPU 核心来训练模型,请增大num_workers或spark.task.cpus。 不能为nthread中定义的估算器设置n_jobs或xgboost.spark参数。 此行为不同于已弃用的sparkdl.xgboost包中定义的估算器的先前行为。
将 sparkdl.xgboost 模型转换为 xgboost.spark 模型
sparkdl.xgboost 模型具有不同于 xgboost.spark 模型的保存格式,并且具有不同的参数设置。 使用以下实用工具函数来转换模型:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
如果你有一个包含 pyspark.ml.PipelineModel 模型作为最后一个阶段的 sparkdl.xgboost 模型,则可以将 sparkdl.xgboost 模型的阶段替换为转换后的 xgboost.spark 模型。
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)