在 Azure Databricks 上创建并连接到 Ray 群集

了解如何在 Azure Databricks 上创建、配置和运行 Ray 计算群集

要求

若要创建 Ray 群集,必须使用以下设置访问 Databricks 全用途计算资源:

  • Databricks Runtime 12.2 LTS ML 及更高版本。
  • 访问模式必须为“单一用户”或“无隔离共享”。

注意

无服务器计算目前不支持 Ray 群集。

安装 Ray

对于 Databricks Runtime ML 15.0 及更高版本,Ray 预安装在 Azure Databricks 群集上。

对于在 15.0 之前发布的运行时,请使用 pip 在群集上安装 Ray:

%pip install ray[default]>=2.3.0

在 Azure Databricks 群集中创建特定于用户的 Ray 群集

若要创建 Ray 群集,请使用 ray.util.spark.setup_ray_cluster API。

注意

在笔记本中创建 Ray 群集时,它仅对当前笔记本用户可用。 在笔记本与群集分离后或笔记本处于不活动状态 30 分钟动后(没有向 Ray 提交任何任务),Ray 群集会自动关闭。 如果要创建与所有用户共享且不受正在运行的笔记本约束的 Ray 群集,请改用 ray.util.spark.setup_global_ray_cluster API。

固定大小的 Ray 群集

在任何附加到 Azure Databricks 群集的 Azure Databricks 笔记本中,都可以运行以下命令来启动固定大小的 Ray 群集:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

自动缩放 Ray 群集

若要了解如何启动自动缩放 Ray 群集,请参阅缩放 Azure Databricks 上的 Ray 群集

启动全局模式 Ray 群集

使用 Ray 2.9.0 及更高版本时,可以在 Azure Databricks 群集上创建全局模式 Ray 群集。 使用全局模式 Ray 群集,附加到 Azure Databricks 群集的所有用户也可以使用 Ray 群集。 这一运行 Ray 群集的模式没有单用户群集在运行单用户 Ray 群集实例时具有的活动超时功能。

若要启动多个用户可以附加到并运行 Ray 任务的全局 ray 群集,请先创建 Azure Databricks 笔记本作业并将其附加到共享模式 Azure Databricks 群集,然后运行以下命令:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

这是一个阻塞调用,在通过单击笔记本命令单元上的“中断”按钮、从 Azure Databricks 群集拆离笔记本或终止 Azure Databricks 群集来中断调用之前,该调用将保持活动状态。 否则,全局模式 Ray 群集将继续运行,并可供授权用户提交任务。 有关全局模式群集的详细信息,请参阅 Ray API 文档

全局模式群集具有以下属性:

  • 在 Azure Databricks 群集中,一次只能创建一个活动全局模式 Ray 群集。
  • 在 Azure Databricks 群集中,任何附加的 Azure Databricks 笔记本中的所有用户都可以使用活动全局模式 Ray 群集。 可以运行 ray.init() 连接到活动全局模式 Ray 群集。 由于多个用户可以访问此 Ray 群集,因此可能出现资源争用问题。
  • 全局模式 Ray 群集在 setup_ray_cluster 调用中断之前保持运行状态。 它不像单用户 Ray 群集那样有自动关闭超时功能。

创建 Ray GPU 群集

对于 GPU 群集,可以通过以下方式将这些资源添加到 Ray 群集:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

使用 Ray 客户端连接到远程 Ray 群集

在 Ray 版本 2.3.0 及更高版本中,可以使用 setup_ray_cluster API 创建 Ray 群集,并且在同一笔记本中,可以调用 ray.init() API 来连接到此 Ray 群集。 若要获取远程连接字符串,请使用以下方法:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

然后,可以使用上述远程连接字符串连接远程群集:

import ray
ray.init(remote_conn_str)

Ray 客户端不支持在 ray.data 模块中定义的 Ray 数据集 API。 解决方法是,可以将调用 Ray 数据集 API 的代码包装到远程 Ray 任务中,如以下代码所示:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

需要配置的值是 Azure Databricks 工作区 URL(以 https:// 开头),然后是在 Ray 群集启动后在显示的 Ray 仪表板代理 URL 中找到的 /driver-proxy/o/ 后找到的值。

Ray 作业 CLI 用于将作业从外部系统提交到 Ray 群集,但在 Azure Databricks 上的 Ray 群集上提交作业不需要使用它。 建议使用 Azure Databricks 作业来部署作业,为每个应用程序创建一个 Ray 群集,并使用现有的 Azure Databrick 工具(例如 Azure Databricks 资产捆绑包或工作流触发器)来触发作业。

设置日志输出位置

可以设置参数 collect_log_to_path 来指定要将 Ray 群集日志收集到的目标路径。 在关闭 Ray 群集后,日志收集将会运行。

Azure Databricks 建议设置以 /dbfs/ 或 Unity Catalog 卷路径开头的路径来保留日志,即使你终止了 Apache Spark 群集也是如此。 否则,日志不可恢复,因为群集关闭时会删除群集上的本地存储。

创建 Ray 群集后,可以直接在笔记本中运行任何 Ray 应用程序代码。 单击“在新选项卡中打开 Ray 群集仪表板”,查看群集的 Ray 仪表板

在 Ray“仪表板执行组件”页上启用堆栈跟踪和火焰图

在 Ray“仪表板执行组件”页上,可以查看活动 Ray 执行组件的堆栈跟踪和火焰图。 若要查看此信息,请在启动 Ray 群集之前使用以下命令安装“py-spy”:

%pip install py-spy

创建和配置最佳做法

本部分介绍了创建和配置 Ray 群集的最佳做法。

非 GPU 工作负载

Ray 群集在 Azure Databricks Spark 群集上运行。 典型场景是使用 Spark 作业和 Spark UDF 执行不需要 GPU 资源的简单数据预处理任务。 然后,使用 Ray 运行受益于 GPU 的复杂机器学习任务。 在这种情况下,Azure Databricks 建议将 Apache Spark 群集级别配置参数 spark.task.resource.gpu.amount 设置为 0,以便所有 Apache Spark DataFrame 转换和 Apache Spark UDF 执行不使用 GPU 资源。

此配置的优点如下:

  • 它可提高 Apache Spark 作业并行度,因为 GPU 实例类型的 CPU 核心数通常比 GPU 设备多很多。
  • 如果 Apache Spark 群集是与多个用户共享的,则此配置可防止 Apache Spark 作业与同时运行的 Ray 工作负载争用 GPU 资源。

如果在 Ray 任务中使用 MLflow 集成,请禁用 transformers 训练器 mlflow 集成

默认情况下会从 transformers 库中启用 transformers 训练器 MLflow 集成。 如果你使用 Ray 训练来微调 transformers 模型,则 Ray 任务将因凭据问题而失败。 但是,如果直接使用 MLflow 进行训练,则不会出现此问题。 为了避免此问题,可以在启动 Apache Spark 群集时,从 Azure Databricks 群集配置中将 DISABLE_MLFLOW_INTEGRATION 环境变量设置为“TRUE”。

地址 Ray 远程函数选取错误

要运行 Ray 任务,Ray 会腌制任务函数。 如果你发现腌制失败,则必须诊断代码的哪个部分导致失败。 发生腌制错误的常见原因有外部引用的处理、闭包,以及对有状态对象的引用。 通过在任务函数声明中移动 import 语句,可以纠正最容易验证和快速更正的错误之一。

例如,datasets.load_dataset 是在 Azure Databricks Runtime 驱动程序端修补的一个广泛使用的函数,导致引用无法撤消腌制。 若要解决此问题,只需如下所示编写任务函数:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

如果 Ray 任务意外终止并出现内存不足 (OOM) 错误,请禁用 Ray 内存监视器

在 Ray 2.9.3 中,Ray 内存监视器存在几个已知问题,这些问题可能会导致 Ray 任务无故意外停止。 要解决此问题,可以在启动 Apache Spark 群集时通过在 Azure Databricks 群集配置中将环境变量 RAY_memory_monitor_refresh_ms 设置为 0 来禁用 Ray 内存监视器。

从 Ray 读取 Spark 数据

常见用例是将数据从 Spark DataFrame 读取到 Ray 中,以便进一步处理。 在用于机器学习的 Databricks Runtime 15.0 及更高版本中,有一个函数可用于简化将 Spark DataFrame 中包含的数据直接加载到 Ray 中的操作。

若要有效地使用此功能,请确保在运行 ray.init() 以启动 Ray 群集之前,将 Spark 群集配置 spark.databricks.pyspark.dataFrameChunk.enabled 设置为 true

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

Ray 将直接提取 Spark DataFrame 内容,而无需临时写入数据,使其可直接用于处理。

从 Spark 读取 Ray 数据

与从 Ray 读取 Spark 数据类似,通过使用 Unity 目录支持作为 Spark DataFrame 从 Ray 任务中读取结果的能力。 若要使用此功能,必须在已启用 Unity 目录的工作区中运行用于机器学习的 Databricks Runtime 15.0 及更高版本。

若要使用此功能,请确保环境变量 "_RAY_UC_VOLUMES_FUST_TEMP_DIR" 设置为有效且可访问的 Unity 目录卷路径,例如 "/Volumes/MyCatalog/MySchema/MyVolume/MyRayData"

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

# Write to the specified (via environment variable) UC Volume from Ray
ray_dataset.write_databricks_table()

在低于用于机器学习的 Databricks Runtime 15.0 版本中,可以使用 Ray parquet 编写器从 ray.data 模块的 ray_dataset.write_parquet() 直接写入对象存储位置。 Spark 可以使用本机读取器读取此 parquet 数据。

将转换函数应用于数据批处理

分批处理数据时,建议将 Ray 数据 API 与 map_batches 函数配合使用。 此方法可以更高效且可缩放,尤其是对于受益于批处理的大型数据集或复杂计算。 任何 Spark DataFrame 都可以使用 ray.data.from_spark API 转换为 Ray 数据集。 可以使用 API ray.data.write_databricks_table 将调用此转换 API 获得的已处理输出写出到 Azure Databricks UC 表。

在 Ray 任务中使用 MLflow

若要在 Ray 任务中使用 MLflow,需要:

  • 在 Ray 任务中定义 Azure Databricks MLflow 凭据。
  • 在 Apache Spark 驱动程序中创建 MLflow 运行并将所创建的 run_id 传递给 Ray 任务。

下面的代码示例演示了如何执行此操作:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

在 Ray 任务中使用以笔记本为作用域的 Python 库或群集 Python 库

目前,Ray 有一个已知问题,即 Ray 任务无法使用笔记本范围的 Python 库或群集 Python 库。 若要在 Ray 作业中利用其他依赖项,必须在启动将在任务中使用这些依赖项的 Ray-on-Spark 群集之前,使用 %pip magic 命令手动安装库。 例如,若要更新将用来启动 Ray 群集的 Ray 版本,可以在笔记本中运行以下命令:

%pip install ray==<The Ray version you want to use> --force-reinstall

然后,在笔记本中运行以下命令以重启 Python 内核:

dbutils.library.restartPython()

后续步骤