在 Azure Databricks 上使用 Ray

重要

此功能目前以公共预览版提供。

通过 Ray 2.3.0 及更高版本,可使用 Azure Databricks 在 Apache Spark 群集上创建 Ray 群集和运行 Ray 应用程序。 有关 Ray 上的机器学习入门信息(包括教程和示例),请参阅 Ray 文档。 有关 Ray 和 Apache Spark 集成的详细信息,请参阅 Spark 上的 Ray API 文档

要求

  • Databricks Runtime 12.0 ML 和更高版本。
  • Databricks Runtime 群集访问模式必须是“已分配”模式或“无隔离共享”模式。

安装 Ray

使用以下命令安装 Ray。 Ray 仪表板组件需要 [default] 扩展。

%pip install ray[default]>=2.3.0

在 Databricks 群集中创建特定于用户的 Ray 群集

若要创建 Ray 群集,请使用 ray.util.spark.setup_ray_cluster API。

在任何附加到 Databricks 群集的 Databricks 笔记本中,都可以运行以下命令:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

ray.util.spark.setup_ray_cluster API 在 Spark 上创建 Ray 群集。 在内部,它会创建一个后台 Spark 作业。 该作业中的每个 Spark 任务创建一个 Ray 工作器节点,Ray 头节点是在驱动程序上创建的。 参数 num_worker_nodes 表示要创建的 Ray 工作器节点数量。 若要指定分配给每个 Ray 工作器节点的 CPU 或 GPU 核心数,请设置参数 num_cpus_per_nodenum_gpus_per_node

创建 Ray 群集后,可以直接在笔记本中运行任何 Ray 应用程序代码。 单击“在新选项卡中打开 Ray 群集仪表板”,查看群集的 Ray 仪表板

提示

如果使用的是 Azure Databricks 单用户群集,可以将 num_worker_nodes 设置为 ray.util.spark.MAX_NUM_WORKER_NODES,以便为 Ray 群集使用所有可用资源。

setup_ray_cluster(
 # ...
 num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

可以设置参数 collect_log_to_path 来指定要将 Ray 群集日志收集到的目标路径。 在关闭 Ray 群集后,日志收集将会运行。 Databricks 建议设置一个以 /dbfs/ 开头的路径,这样,即使终止 Spark 群集,也会保留日志。 否则,日志不可恢复,因为群集关闭时会删除群集上的本地存储。

注意

“若要让 Ray 应用程序自动使用此 Ray 群集,请调用 ray.util.spark.setup_ray_clusterRAY_ADDRESS 环境变量设置为 Ray 群集的地址。”可以使用 ray.init API 的 address 参数指定替代群集地址。

运行 Ray 应用程序

创建 Ray 群集后,可以在 Azure Databricks 笔记本中运行任何 Ray 应用程序代码。

重要

Databricks 建议使用 %pip install <your-library-dependency> 为应用程序安装任何必要的库,以确保这些库相应地可供 Ray 群集和应用程序使用。 在 Ray init 函数调用中指定依赖项会将依赖项安装在 Spark 工作器节点无法访问的位置,这会导致版本不兼容和导入错误。

例如,可以在 Azure Databricks 笔记本中运行简单的 Ray 应用程序,如下所示:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

在自动缩放模式下创建 Ray 群集

在 Ray 2.8.0 及更高版本中,在 Databricks 上启动的 Ray 群集支持与 Databricks 自动缩放集成。 请参阅 Databricks 群集自动缩放

使用 Ray 2.8.0 及更高版本,可以在 Databricks 群集上创建一个支持根据工作负载进行纵向扩展或缩减的 Ray 群集。 此自动缩放集成在 Databricks 环境内部触发 Databricks 群集自动缩放。

若要启用自动缩放,请运行以下命令:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

如果已启用自动缩放,num_worker_nodes 指示 Ray 工作器节点的最大数目。 Ray 工作器节点的默认最小数量为零。 此默认设置意味着,当 Ray 群集处于空闲状态时,它会纵向缩减到零个 Ray 工作器节点。 这可能并不是所有方案中快速响应的理想方案,但启用后,可以大大降低成本。

在自动缩放模式下,无法将 num_worker_nodes 设置为 ray.util.spark.MAX_NUM_WORKER_NODES

以下参数用于配置纵向扩展和缩减速度:

  • autoscale_upscaling_speed 表示允许挂起的节点数,为当前节点数的倍数。 该值越大,纵向扩展就越激进。 例如,如果此值设置为 1.0,则群集大小可以随时以最大 100% 的速度增长。
  • autoscale_idle_timeout_minutes 表示需要经过多少分钟后,自动缩放程序才可移除空闲工作器节点。 该值越小,纵向缩减就越激进。

使用 Ray 2.9.0 及更高版本,还可以设置 autoscale_min_worker_nodes,以防止 Ray 群集在 Ray 群集处于空闲状态时纵向缩减为零个工作器。

使用 Ray 客户端连接到远程 Ray 群集

在 Ray 2.9.0 中,可以使用 setup_ray_cluster API 创建 Ray 群集,并且在同一笔记本中,可以调用 ray.init() API 来连接到此 Ray 群集。

使用以下方法获取远程连接字符串:

from ray.util.spark import setup_ray_cluster

remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

然后,可以使用上述远程连接字符串连接远程群集:

import ray
ray.init(remote_conn_str)

Ray 客户端不支持在 ray.data 模块中定义的 Ray 数据集 API。 解决方法是,可以将调用 Ray 数据集 API 的代码包装到远程 Ray 任务中,如以下代码所示:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

从 Spark 数据帧加载数据

若要将 Spark 数据帧加载为 Ray 数据集,首先必须使用 Parquet 格式将 Spark 数据帧保存到 UC 卷或 Databricks 文件系统(已弃用)。 为了安全地控制 Databricks 文件系统访问,Databricks 建议将云对象存储装载到 DBFS。 然后,可以使用以下帮助器方法从保存的 Spark 数据帧路径创建 ray.data.Dataset 实例:

import ray
import os
from urllib.parse import urlparse

def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

通过 Databricks SQL 仓库从 Unity Catalog 表加载数据

对于 Ray 2.8.0 及更高版本,可以调用 ray.data.read_databricks_tables API,以从 Databricks Unity Catalog 表加载数据。

首先,需要将 DATABRICKS_TOKEN 环境变量设置为 Databricks 仓库访问令牌。 如果未在 Databricks Runtime 上运行程序,则还可以将 DATABRICKS_HOST 环境变量设置为 Databricks 工作区 URL,如下所示:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.databricks.azure.cn

然后,调用 ray.data.read_databricks_tables() 以从 Databricks SQL 仓库进行读取。

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

配置 Ray 头节点使用的资源

默认情况下,对于 Spark 上的 Ray 配置,Databricks 会将分配给 Ray 头节点的资源限制为:

  • 0 个 CPU 核心
  • 0 个 GPU
  • 128 MB 堆内存
  • 128 MB 对象存储内存

这是因为 Ray 头节点通常仅用于全局协调,而不用于执行 Ray 任务。 Spark 驱动程序节点资源是与多个用户共享的,因此默认设置将资源保存在 Spark 驱动程序端。

使用 Ray 2.8.0 及更高版本,可以配置 Ray 头节点使用的资源。 在 setup_ray_cluster API 中使用下列参数:

  • num_cpus_head_node:设置 Ray 头节点使用的 CPU 核心数
  • num_gpus_head_node:设置 Ray 头节点使用的 GPU
  • object_store_memory_head_node:按 Ray 头节点设置对象存储内存大小

支持异类群集

为了更高效且经济有效的训练运行,可以创建 Spark 上的 Ray 群集,并在 Ray 头节点和 Ray 工作器节点之间设置不同的配置。 但是,所有 Ray 工作器节点都必须具有相同的配置。 Databricks 群集不完全支持异类群集,但可以通过设置群集策略创建具有不同驱动程序和工作器实例类型的 Databricks 群集。

例如:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

优化 Ray 群集配置

每个 Ray 工作器节点的建议配置是:

  • 每个 Ray 工作器节点至少有 4 个 CPU 核心。
  • 每个 Ray 工作器节点至少有 10GB 堆内存。

因此,在调用 ray.util.spark.setup_ray_cluster 时,Databricks 建议将 num_cpus_per_node 设置为 >=4 的值。

有关为每个 Ray 工作器节点优化堆内存的详细信息,请参阅 Ray 工作器节点的内存分配

Ray 工作器节点的内存分配

每个 Ray 工作器节点使用两种类型的内存:堆内存和对象存储内存。 按如下所述确定每种类型的分配内存大小。

分配给每个 Ray 工作器节点的内存总量为:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES 是可以在 Spark 工作器节点上启动的 Ray 工作器节点的最大数量。 此数量由参数 num_cpus_per_nodenum_gpus_per_node 确定。

如果未设置参数 object_store_memory_per_node,则分配给每个 Ray 工作器节点的堆内存大小和对象存储内存大小为:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

如果设置了参数 object_store_memory_per_node

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

此外,每个 Ray 工作器节点的对象存储内存大小受操作系统共享内存的限制。 最大值为:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY 是为 Spark 工作器节点配置的 /dev/shm 磁盘大小。

Spark 群集配置提示

Ray 群集在 Databricks Spark 群集上运行。 常见方案是使用 Spark 作业和 Spark UDF 执行不需要 GPU 资源的简单数据预处理任务。 然后,使用 Ray 执行受益于 GPU 的复杂机器学习任务。 在这种情况下,Databricks 建议将 Spark 群集级别配置参数 spark.task.resource.gpu.amount 设置为 0,以便所有 Spark 数据帧转换和 Spark UDF 执行不使用 GPU 资源。

此配置的优点如下:

  • 它可提高 Spark 作业并行度,因为 GPU 实例类型的 CPU 核心数通常比 GPU 设备多很多。
  • 如果 Spark 群集是与多个用户共享的,则此配置可防止 Spark 作业与同时运行的 Ray 工作负载争用 GPU 资源。

在 Ray“仪表板执行组件”页上启用堆栈跟踪和火焰图

在 Ray“仪表板执行组件”页上,可以查看活动 Ray 执行组件的堆栈跟踪和火焰图。 若要查看此信息,请在启动 Ray 群集之前使用以下命令安装“py-spy”:

%pip install py-spy

关闭 Ray 群集

若要关闭 Azure Databricks 上运行的 Ray 群集,可以调用 ray.utils.spark.shutdown_ray_cluster API。

注意

Ray 群集在以下情况下也会关闭:

  • 从 Azure Databricks 群集中分离交互式笔记本。
  • Azure Databricks 作业已完成。
  • Azure Databricks 群集已重启或终止。
  • 指定的空闲时间没有活动。

示例笔记本

以下笔记本演示如何在 Databricks 上创建 Ray 群集和运行 Ray 应用程序。

Spark 上的 Ray 初学者笔记本

获取笔记本

限制

  • 不支持多用户共享 Azure Databricks 群集(启用隔离模式)。
  • 使用 %pip 安装包时,Ray 群集将会关闭。 请确保在使用 %pip 安装完所有库后启动 Ray。
  • 使用替代 ray.util.spark.setup_ray_cluster 中的配置的集成可能会导致 Ray 群集变得不稳定,并可能导致 Ray 上下文崩溃。 例如,使用 xgboost_ray 包,并使用超过 Ray 群集配置的执行组件或 cpus_per_actor 配置设置 RayParams,可能会导致 Ray 群集以无提示方式崩溃。