在 Azure Databricks 上启动 Ray 群集
Azure Databricks 通过像处理任何 Apache Spark 作业一样处理群集和作业配置,来简化启动 Ray 群集的过程。 这是因为 Ray 群集实际上是在托管的 Apache Spark 群集上启动的。
在本地计算机上运行 Ray
import ray
ray.init()
在 Azure Databricks 上运行 Ray
from ray.util.spark import setup_ray_cluster
import ray
# If the cluster has four workers with 8 CPUs each as an example
setup_ray_cluster(num_worker_nodes=4, num_cpus_per_worker=8)
# Pass any custom configuration to ray.init
ray.init(ignore_reinit_error=True)
此方法适用于任何群集规模,从几个到数百个节点均可。 Azure Databricks 上的 Ray 群集还支持自动缩放。
创建 Ray 群集后,可以在 Azure Databricks 笔记本中运行任何 Ray 应用程序代码。
重要
Databricks 建议使用 %pip install <your-library-dependency>
为应用程序安装任何必要的库,以确保这些库相应地可供 Ray 群集和应用程序使用。 在 Ray init 函数调用中指定依赖项会将依赖项安装在 Apache Spark 工作器节点无法访问的位置,这会导致版本不兼容和导入错误。
例如,可以在 Azure Databricks 笔记本中运行简单的 Ray 应用程序,如下所示:
import ray
import random
import time
from fractions import Fraction
ray.init()
@ray.remote
def pi4_sample(sample_count):
"""pi4_sample runs sample_count experiments, and returns the
fraction of time it was inside the circle.
"""
in_count = 0
for i in range(sample_count):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
in_count += 1
return Fraction(in_count, sample_count)
SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')
pi = pi4 * 4
print(float(pi))
关闭 Ray 群集
在以下情况下,Ray 群集会自动关闭:
- 从 Azure Databricks 群集中分离交互式笔记本。
- Azure Databricks 作业已完成。
- Azure Databricks 群集已重启或终止。
- 指定的空闲时间没有活动。
若要关闭在 Azure Databricks 上运行的 Ray 群集,可以调用 ray.utils.spark.shutdown_ray_cluster
API。
from ray.utils.spark import shutdown_ray_cluster
import ray
shutdown_ray_cluster()
ray.shutdown()