使用 Azure Databricks,可以在相同的执行环境中运行 Ray 和 Spark 操作,以利用两种分布式计算引擎的优势。
Delta Lake 和 Unity Catalog 支持 Ray 和 Spark 集成,可提供可靠的数据管理、安全访问和世系跟踪。
本文介绍了如何根据以下用例连接 Ray 和 Spark 操作:
- 将 Spark 数据写入 Ray 数据:高效地将内存中的数据传输到 Ray。
- 将 Ray 数据写入 Spark:将 Ray 中的数据输出回 Delta Lake 或其他存储解决方案,以确保兼容性和访问。
- 将外部 Ray 应用程序连接到 Unity Catalog:连接 Databricks 外部的 Ray 应用程序以从 Databricks Unity Catalog 表加载数据。
有关何时使用 Ray、Spark 或两者的详细信息,请参阅何时使用 Spark 与 Ray。
若要从 Spark DataFrame 创建分布式 Ray 数据集,可以使用 ray.data.from_spark()
函数直接从 Ray 读取 Spark DataFrame,无需将数据写入任何位置。
Databricks Runtime ML 15.0 及更高版本上可进行内存中 Spark 到 Ray 的传输。
若要启用此功能,必须执行以下操作:
- 在启动群集之前,将 Spark 群集配置
spark.databricks.pyspark.dataFrameChunk.enabled
设置为true
。
import ray.data
source_table = "my_db.my_table"
# Read a Spark DataFrame from a Delta table in Unity Catalog
df = spark.read.table(source_table)
ray_ds = ray.data.from_spark(df)
警告
自动缩放 Spark 群集(包括使用现成实例的群集)必须将 use_spark_chunk_api
参数设置为 False
才能使用 from_spark()
函数。 否则,API 调用将导致缓存失误,因为执行程序终止时,Spark 执行程序上的缓存会丢失。
ray_ds = ray.data.from_spark(df, use_spark_chunk_api=False)
若要将 Ray 数据写入 Spark,必须将数据集写入 Spark 可访问的位置。
在低于 15.0 的 Databricks Runtime ML 中,可以使用 Ray parquet 编写器 ray_dataset.write_parquet()
从 ray.data
模块直接写入对象存储位置。 Spark 可以使用本机读取器读取此 Parquet 数据。
对于已启用 Unity Catalog 的工作区,请使用 ray.data.Dataset.write_databricks_table
函数写入 Unity Catalog 表。
此函数临时将 Ray 数据集存储在 Unity Catalog 卷中,使用 Spark 从 Unity Catalog 卷读取,最后写入 Unity Catalog 表。 在调用 ray.data.Dataset.write_databricks_table
函数之前,请确保环境变量 "_RAY_UC_VOLUMES_FUSE_TEMP_DIR"
设置为有效且可访问的 Unity Catalog 卷路径,例如 "/Volumes/MyCatalog/MySchema/MyVolume/MyRayData"
。
ds = ray.data
ds.write_databricks_table()
对于未启用 Unity Catalog 的工作区,可以将 Ray 数据数据集手动存储为临时文件(例如 DBFS 中的 Parquet 文件),然后使用 Spark 读取数据文件。
ds.write_parquet(tmp_path)
df = spark.read.parquet(tmp_path)
df.write.format("delta").saveAsTable(table_name)
Azure Databricks 还可将 Ray Core 应用程序与 Spark 集成,使你能够在同一环境中运行 Ray Core(Ray 的较低级别 API)和 Spark 工作负载,并在它们之间启用数据交换。 此集成提供了多种模式来满足不同的工作负载和数据管理需求,确保使用两种框架获得简化的体验。
有三种主要模式用于将数据从 Ray 写入 Spark。
- 将输出保存在临时位置:将 Ray 任务输出临时存储在 DBFS 或 Unity Catalog 卷中,然后再将它们合并到 Spark 数据帧中。
- 使用 Spark Connect 进行连接:将 Ray 任务直接连接到 Spark 群集,使 Ray 能够与 Spark 数据帧和表交互。
- 使用第三方库:使用外部库(如
deltalake
或deltaray
)将数据从 Ray Core 任务写入 Delta Lake 或 Spark 表。
将数据从 Ray 写入 Spark 的最常见模式是将输出数据存储在临时位置,例如 Unity Catalog 卷或 DBFS。 存储数据后,Ray 驱动程序线程将读取工作器节点上的文件的每个部分,并将其合并到最终数据帧中以供进一步处理。 通常,临时文件采用标准格式(如 CSV)。 当输出数据采用表格形式(如由 Ray Core 任务生成的 Pandas 数据帧)时,此方法效果最佳。
当 Ray 任务的输出太大而无法容纳驱动程序节点或共享对象存储的内存时,请使用此方法。 如果需要处理大型数据集而不将数据保存到存储,请考虑增加分配给 Databricks 群集中驱动程序节点的内存以提高性能。
import os
import uuid
import numpy as np
import pandas as pd
@ray.remote
def write_example(task_id, path_prefix):
num_rows = 100
df = pd.DataFrame({
'foo': np.random.rand(num_rows),
'bar': np.random.rand(num_rows)
})
# Write the DataFrame to a CSV file
df.to_csv(os.path.join(path_prefix, f"result_part_{task_id}.csv"))
n_tasks = 10
# Put a unique DBFS prefix for the temporary file path
dbfs_prefix = f"/dbfs/<USERNAME>"
# Create a unique path for the temporary files
path_prefix = os.path.join(dbfs_prefix, f"/ray_tmp/write_task_{uuid.uuid4()}")
tasks = ray.get([write_example.remote(i, path_prefix) for i in range(n_tasks)])
# Read all CSV files in the directory into a single DataFrame
df = spark.read.csv(path_prefix.replace("/dbfs", "dbfs:"), header=True, inferSchema=True)
Ray Core 任务在远程任务中与 Spark 交互的另一种方式是使用 Spark Connect。 这样,可以在 Ray 辅助角色上设置 Spark 上下文,以指向从驱动程序节点运行的 Spark 群集。
若要设置此内容,必须将 Ray 群集资源配置为为 Spark 分配空间。 例如,如果工作器节点有 8 个 CPU,请将 num_cpus_worker_node 设置为 7,为 Spark 留出 1 个 CPU。 对于较大的 Spark 任务,建议分配更大份额的资源。
from databricks.connect import DatabricksSession
import ray
@ray.remote
class SparkHandler(object):
def __init__(self, access_token=None, cluster_id=None, host_url=None):
self.spark = (DatabricksSession
.builder
.remote(host=host_url,
token=access_token,
cluster_id=cluster_id)
.getOrCreate()
)
def test(self):
df = self.spark.sql("select * from samples.nyctaxi.trips")
df.write.format("delta").mode(
"overwrite").saveAsTable("catalog.schema.taxi_trips")
return df.count()
access_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
cluster_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().clusterId().get()
host_url = f"https://{dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get('browserHostName').get()}"
sh = SparkHandler.remote(access_token=access_token,
cluster_id=cluster_id,
host_url=host_url)
print(ray.get(sh.test.remote()))
此示例使用笔记本生成的令牌。 然而,Databricks 建议生产用例使用存储在 Databricks 机密中的访问令牌。
由于此过程调用单个 Spark 驱动程序,因此会创建线程锁,导致所有任务等待前面的 Spark 任务完成。 因此,建议在并发任务不多时使用此功能,因为它们在 Spark 任务完成时都会具有连续行为。 对于这些情况,最好保留输出,然后在最后合并为单个 Spark 数据帧,然后写出到输出表。
另一个选项是使用与 Delta Lake 和 Spark 交互的第三方库。 Azure Databricks 不正式支持这些第三方库。
例如,delta-rs
项目中的 deltalake
库。 此方法目前仅适用于 Hive 元存储表,而不适用于 Unity Catalog 表。
from deltalake import DeltaTable, write_deltalake
import pandas as pd
import numpy as np
import ray
@ray.remote
def write_test(table_name):
random_df_id_vals = [int(np.random.randint(1000)), int(np.random.randint(1000))]
pdf = pd.DataFrame({"id": random_df_id_vals, "value": ["foo", "bar"]})
write_deltalake(table_name, pdf, mode="append")
def main():
table_name = "database.mytable"
ray.get([write_test.remote(table_name) for _ in range(100)])
另一个可用的第三方库是通过 Delta Incubator 项目 https://github.com/delta-incubator/deltaray提供的 deltaray 库)
# Standard Libraries
import pathlib
# External Libraries
import deltaray
import deltalake as dl
import pandas as pd
# Creating a Delta Table
cwd = pathlib.Path().resolve()
table_uri = f'{cwd}/tmp/delta-table'
df = pd.DataFrame({'id': [0, 1, 2, 3, 4, ], })
dl.write_deltalake(table_uri, df)
# Reading our Delta Table
ds = deltaray.read_delta(table_uri)
ds.show()
对于 Ray 2.8.0 及更高版本,若要将 Azure Databricks 外部的 Ray 应用程序连接到 Azure Databricks 中的表,可以调用 ray.data.read_databricks_tables
API 来从 Unity Catalog 表加载数据。
首先,将 DATABRICKS_TOKEN
环境变量设置为 SQL 仓库访问令牌。 如果未在 Databricks Runtime 上运行程序,则还可以将 DATABRICKS_HOST
环境变量设置为 Databricks 工作区 URL,如下所示:
export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.databricks.azure.cn
然后,调用 ray.data.read_databricks_tables()
以从 SQL 仓库进行读取。
import ray
ray_dataset = ray.data.read_databricks_tables(
warehouse_id='...', # Databricks SQL warehouse ID
catalog='catalog_1', # Unity Catalog name
schema='db_1', # Schema name
query="SELECT title, score FROM movie WHERE year >= 1980",
)
警告
Databricks 仓库只能缓存查询结果大约 2 小时。 对于长时间运行的工作负载,请调用 ray.data.Dataset.materialize
方法将 Ray 数据集具体化到 Ray 分布式对象存储。
还可以从 Azure Databricks 增量共享表读取数据。 从增量共享表读取比从 Databricks 仓库缓存读取更可靠。
ray.data.read_delta_sharing_tables
API 可在 Ray 2.33 及更高版本上使用。
import ray
ds = ray.data.read_delta_sharing_tables(
url=f"<profile-file-path>#<share-name>.<schema-name>.<table-name>",
limit=100000,
version=1,
)
- 始终使用 Ray 群集最佳做法指南中所述的技术来确保充分利用群集。
- 请考虑使用 Unity Catalog 卷以非表格格式存储输出数据并提供治理。
- 确保
num_cpus_worker_node
配置已设置,以便 CPU 核心数与 Spark 工作器节点的 CPU 核心数匹配。 同样,将num_gpus_worker_node
设置为每个 Spark 工作器节点的 GPU 数量。 在此配置中,每个 Spark 工作器节点启动一个完全利用 Spark 工作器节点资源的 Ray 工作器节点。
Unity Catalog 当前不共享用于从非 Spark 编写器写入表的凭据。 因此,从 Ray Core 任务写入 Unity Catalog 表的所有数据都需要持久保存数据,然后使用 Spark 读取数据,或者必须在 Ray 任务中设置 Databricks Connect。