在 Azure Databricks 上启动 Ray 群集

Azure Databricks 通过像处理任何 Apache Spark 作业一样处理群集和作业配置,来简化启动 Ray 群集的过程。 这是因为 Ray 群集实际上是在托管的 Apache Spark 群集上启动的。

在本地计算机上运行 Ray

import ray

ray.init()

在 Azure Databricks 上运行 Ray

from ray.util.spark import setup_ray_cluster
import ray

# If the cluster has four workers with 8 CPUs each as an example
setup_ray_cluster(num_worker_nodes=4, num_cpus_per_worker=8)

# Pass any custom configuration to ray.init
ray.init(ignore_reinit_error=True)

此方法适用于任何群集规模,从几个到数百个节点均可。 Azure Databricks 上的 Ray 群集还支持自动缩放。

创建 Ray 群集后,可以在 Azure Databricks 笔记本中运行任何 Ray 应用程序代码。

重要

Databricks 建议使用 %pip install <your-library-dependency> 为应用程序安装任何必要的库,以确保这些库相应地可供 Ray 群集和应用程序使用。 在 Ray init 函数调用中指定依赖项会将依赖项安装在 Apache Spark 工作器节点无法访问的位置,这会导致版本不兼容和导入错误。

例如,可以在 Azure Databricks 笔记本中运行简单的 Ray 应用程序,如下所示:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

关闭 Ray 群集

在以下情况下,Ray 群集会自动关闭:

  • 从 Azure Databricks 群集中分离交互式笔记本。
  • Azure Databricks 作业已完成。
  • Azure Databricks 群集已重启或终止。
  • 指定的空闲时间没有活动。

若要关闭在 Azure Databricks 上运行的 Ray 群集,可以调用 ray.utils.spark.shutdown_ray_cluster API。

from ray.utils.spark import shutdown_ray_cluster
import ray

shutdown_ray_cluster()
ray.shutdown()

后续步骤