HorovodRunner:使用 Horovod 进行分布式深度学习 HorovodRunner: distributed deep learning with Horovod

HorovodRunner 是一种通用 API,可使用 Uber 的 Horovod 框架在 Azure Databricks 上运行分布式深度学习工作负载。HorovodRunner is a general API to run distributed deep learning workloads on Azure Databricks using Uber’s Horovod framework. 通过将 Horovod 与 Spark 的屏障模式集成,Azure Databricks 可为 Spark 上长期运行的深度学习训练作业提供更高的稳定性。By integrating Horovod with Spark’s barrier mode, Azure Databricks is able to provide higher stability for long-running deep learning training jobs on Spark. HorovodRunner 采用 Python 方法,后者包含带 Horovod 挂钩的 DL 训练代码。HorovodRunner takes a Python method that contains DL training code with Horovod hooks. 在驱动程序上对该方法执行 pickle 操作,然后将其发送到 Spark 辅助角色。This method gets pickled on the driver and sent to Spark workers. 使用屏障执行模式将 Horovod MPI 作业以 Spark 作业的形式嵌入。A Horovod MPI job is embedded as a Spark job using barrier execution mode. 第一个执行程序使用 BarrierTaskContext 收集所有任务执行程序的 IP 地址,并使用 mpirun 触发 Horovod 作业。The first executor collects the IP addresses of all task executors using BarrierTaskContext and triggers a Horovod job using mpirun. 每个 Python MPI 进程都会重新加载执行过 pickle 操作的程序,然后将其反序列化并运行。Each Python MPI process loads the pickled program back, deserializes it, and runs it.

HorovodRunnerHorovodRunner

使用 HorovodRunner 进行分布式训练Distributed training with HorovodRunner

通过 HorovodRunner 可将 Horovod 训练作业以 Spark 作业的形式启动。HorovodRunner provides the ability to launch Horovod training jobs as Spark jobs. HorovodRunner API 支持以下方法:The HorovodRunner API supports the following methods:

init(self, np)

创建 HorovodRunner 的实例。Create an instance of HorovodRunner.

run(self, main, **kwargs)

运行调用 main(**kwargs) 的 Horovod 训练作业。Run a Horovod training job invoking main(**kwargs). 使用 cloudpickle 对主函数和关键字参数进行了序列化,并分发到了群集辅助角色。Both the main function and the keyword arguments are serialized using cloudpickle and distributed to cluster workers.

有关详细信息,请参阅 HorovodRunner API 文档For details, see the HorovodRunner API documentation.

使用 HorovodRunner 开发分布式训练计划的常规方法如下:The general approach to developing a distributed training program using HorovodRunner is:

  1. 创建 HorovodRunner 实例,并使用节点数将其初始化。Create a HorovodRunner instance initialized with the number of nodes.
  2. 根据 Horovod 用法中所述的方法定义 Horovod 训练方法,确保可在该方法中添加任何 import 语句。Define a Horovod training method according to the methods described in Horovod usage, making sure to add any import statements inside the method.
  3. 将训练方法传递到 HorovodRunner 实例。Pass the training method to the HorovodRunner instance.

例如:For example:

hr = HorovodRunner(np=2)

def train():
  import tensorflow as tf
  hvd.init()

hr.run(train)

若要仅使用 n 子进程在驱动程序上运行 HorovodRunner,请使用 hr = HorovodRunner(np=-n)To run HorovodRunner on the driver only with n subprocesses, use hr = HorovodRunner(np=-n). 例如,如果驱动程序节点上有 4 个 GPU,则 n 最多可选为 4For example, if there are 4 GPUs on the driver node, you can choose n up to 4. 有关参数 np 的详细信息,可参阅 HorovodRunner API 文档You can find details about the parameter np in the HorovodRunner API documentation. 若要详细了解如何对每个子进程固定一个 GPU,可参阅 Horovod 使用指南You can find details about how to pin one GPU per subprocess in the Horovod usage guide.

常见错误如下:找不到 TensorFlow 对象,或无法对该对象执行 pickle 操作。A common error is that TensorFlow objects cannot be found or pickled. 未向其他执行程序分发 import 语句库时,会发生以上错误。This happens when the library import statements are not being distributed to other executors. 为避免此问题,请确保在 Horovod 训练方法的顶部和 Horovod 训练方法所引用的其他任何用户定义函数中添加所有 import 语句(例如 import tensorflow as tf)。To avoid this issue, ensure that all import statements (for example, import tensorflow as tf) are added at both the top of the Horovod training method and inside any other user defined functions referenced in the Horovod training method.

使用 Horovod Timeline 记录 Horovod 训练Record Horovod training with Horovod Timeline

Horovod 可记录其活动的时间线,称为 Horovod 时间线Horovod has the ability to record the timeline of its activity, called Horovod Timeline.

重要

Horovod 时间线会对性能造成重大影响。Horovod Timeline has a significant impact on performance. 启用 Horovod 时间线后,Inception3 吞吐量可能会降低约 40%。Inception3 throughput can decrease by ~40% when Horovod Timeline is enabled. 如果要加快 HorovodRunner 作业的速度,请禁用 Horovod 时间线。If you want to speed up your HorovodRunner jobs, disable Horovod Timeline.

若要记录 Horovod 时间线,请将 HOROVOD_TIMELINE 环境变量设置为要创建的时间线文件的位置。To record a Horovod Timeline, set the HOROVOD_TIMELINE environment variable to the location of the timeline file to be created. 该位置应位于共享存储(例如使用 DBFS 本地文件 API),以便可轻松检索到时间线文件。The location should be on shared storage, e.g., using DBFS local file APIs, so that the timeline file can be easily retrieved. 例如:For example:

timeline_dir = "/dbfs/ml/horovod-timeline/%s" % uuid.uuid4()
os.makedirs(timeline_dir)
os.environ['HOROVOD_TIMELINE'] = timeline_dir + "/horovod_timeline.json"
hr = HorovodRunner(np=4)
hr.run(run_training_horovod, params=params)

可使用 Databricks CLIFileStore 下载时间线文件,然后使用 Chrome 浏览器的 chrome://tracing 工具查看该文件。You can download the timeline file using Databricks CLI or FileStore, and then use the Chrome browser’s chrome://tracing facility to view it. 例如:For example:

Horovod 时间线Horovod timeline

开发工作流 Development workflow

若要将单节点 DL 迁移到分布式训练,请在工作流中执行下列主要步骤。To migrate a single node DL to distributed training, the following are the broad steps in the workflow.

  1. 准备单节点 DL 代码: 使用 TensorFlow、Keras 或 PyTorch 准备和测试单节点 DL 代码。Prepare single node DL code: Prepare and test the single node DL code with TensorFlow, Keras, or PyTorch.
  2. 迁移到 Horovod: 请按照 Horovod 用法中的说明,使用 Horovod 迁移代码并在驱动程序上对其进行测试:Migrate to Horovod: Follow the instructions from Horovod usage to migrate the code with Horovod and test it on the driver:
    1. 添加 hvd.init() 以初始化 Horovod。Add hvd.init() to initialize Horovod.
    2. 使用 config.gpu_options.visible_device_list 固定服务器 GPU,以供该进程使用。Pin a server GPU to be used by this process using config.gpu_options.visible_device_list. 典型设置是每个进程一个 GPU;通过它,这可设置为“本地等级”。With the typical setup of one GPU per process, this can be set to local rank. 这样的话,将向服务器上的第一个进程分配第一个 GPU,向第二个进程分配第二个 GPU,依此类推。In that case, the first process on the server will be allocated the first GPU, second process will be allocated the second GPU and so forth.
    3. 包含数据集的分片。Include a shard of the dataset. 运行分布式训练时,该数据集运算符非常有用,因为它允许每个辅助角色读取唯一的子集。This dataset operator is very useful when running distributed training, as it allows each worker to read a unique subset.
    4. 按辅助角色数缩放学习速率。Scale the learning rate by number of workers. 按辅助角色数缩放同步分布式训练中的有效批大小。Effective batch size in synchronous distributed training is scaled by the number of workers. 学习速率的提高可平衡批大小的增加。An increase in learning rate compensates for the increased batch size.
    5. hvd.DistributedOptimizer 中包装优化器。Wrap the optimizer in hvd.DistributedOptimizer. 分布式优化器将梯度计算委派给初始优化器,使用 allreduce 或 allgather 对梯度求平均,然后应用这些平均后的梯度。The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients.
    6. 添加 hvd.BroadcastGlobalVariablesHook(0),将初始变量状态从等级 0 广播到其他所有进程。Add hvd.BroadcastGlobalVariablesHook(0) to broadcast initial variable states from rank 0 to all other processes. 使用随机权重开始训练或从检查点恢复训练时,必须这样操作以确保所有辅助角色实现一致的初始化。This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. 另外,如果你不使用 MonitoredTrainingSession,则可在初始化全局变量后执行 hvd.broadcast_global_variables 操作。Alternatively, if you’re not using MonitoredTrainingSession, you can execute the hvd.broadcast_global_variables operation after global variables have been initialized.
    7. 将代码修改为仅在辅助角色 0 上保存检查点,以防止其他辅助角色对检查点造成干扰。Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. 可通过将 checkpoint_dir=None 传递到 tf.train.MonitoredTrainingSession if hvd.rank() != 0 来实现此目的。This can be accomplished by passing checkpoint_dir=None to tf.train.MonitoredTrainingSession if hvd.rank() != 0.
  3. 迁移到 HorovodRunner: HorovodRunner 通过调用 Python 函数来运行 Horovod 训练作业。Migrate to HorovodRunner: HorovodRunner runs the Horovod training job by invoking a Python function. 你必须将主要训练过程包装到一个函数中。You must wrap the main training procedure into one function. 然后,可在本地模式和分布式模式下测试 HorovodRunnerThen you can test HorovodRunner in local mode and distributed mode.

更新深度学习库Update the deep learning libraries

如果要升级或降级 TensorFlow、Keras 或 PyTorch,必须重新安装 Horovod,以便根据新安装的库对其进行编译。If you upgrade or downgrade TensorFlow, Keras, or PyTorch, you must reinstall Horovod so that it is compiled against the newly installed library. 例如,如果要升级 TensorFlow,Databricks 建议使用 TensorFlow 安装说明中的 init 脚本,并在该脚本末尾追加以下特定于 TensorFlow 的 Horovod 安装代码。For example, if you want to upgrade TensorFlow, Databricks recommends using the init script from the TensorFlow installation instructions and appending the following TensorFlow specific Horovod installation code to the end of it. 请参阅 Horovod 安装说明以使用不同的组合,例如升级或降级 PyTorch 和其他库。See Horovod installation instructions to work with different combinations, such as upgrading or downgrading PyTorch and other libraries.

add-apt-repository -y ppa:ubuntu-toolchain-r/test
apt update
# Using the same compiler that TensorFlow was built to compile Horovod
apt install g++-7 -y
update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-7 60 --slave /usr/bin/g++ g++ /usr/bin/g++-7

HOROVOD_GPU_ALLREDUCE=NCCL HOROVOD_CUDA_HOME=/usr/local/cuda pip install horovod==0.18.1 --force-reinstall --no-deps --no-cache-dir

示例Examples

以下示例基于 MNIST 数据集,它们演示了如何使用 HorovodRunner 将单节点深度学习程序迁移到分布式深度学习。The following examples, based on the MNIST dataset, demonstrate how to migrate a single-node deep learning program to distributed deep learning with HorovodRunner.