将工作负荷从经典计算迁移到无服务器计算。 无服务器计算会自动处理预配、缩放、运行时升级和优化。
大多数经典工作负荷只需进行最少或没有代码更改即可迁移。 本页重点介绍这些工作负载。 某些功能(例如 df.cache,在无服务器)上尚不受支持,但一旦可用,就不需要更改代码。 依赖于 R 或 Scala 笔记本的某些工作负荷需要经典计算,并且无法迁移到无服务器。 有关当前限制的完整列表,请参阅 无服务器计算限制。
迁移步骤
若要将工作负荷从经典计算迁移到无服务器计算,请执行以下步骤:
- 检查先决条件:验证工作区、网络和云存储访问是否符合要求。 请参阅准备工作。
- 更新代码:进行任何必要的代码和配置更改。 请参阅 “更新代码”。
- 测试工作负载:在切换之前验证兼容性和正确性。 请参阅 “测试工作负荷”。
- 选择性能模式:选择最符合工作负荷要求的性能模式。 请参阅 “选择性能模式”。
- 分阶段迁移:从新的低风险工作负载开始,以增量方式推出无服务器工作负荷。 请参阅 分阶段迁移。
- 监视成本:跟踪无服务器 DBU 消耗并设置警报。 请参阅 “监视成本”。
在您开始之前
在开始迁移之前,可能需要更新工作区中的某些旧配置。
| 先决条件 | Action | 详细信息 |
|---|---|---|
| 启用 Unity Catalog 工作区 | 如有需要,从 Hive Metastore 迁移 | 将Azure Databricks工作区更新到 Unity 目录 |
| 已配置网络 | 将 VPC 对等连接 (VPC peering) 替换为网络控制中心 (NCC)、专用链接 或防火墙规则 | 无服务器计算平面网络 |
| 云存储访问 | 将旧数据访问模式替换为 Unity 目录外部位置 | 使用 Unity Catalog 连接到云对象存储 |
确认工作区位于 受支持的区域中。
更新代码
以下部分列出了使工作负载与无服务器兼容的代码和配置更改。
数据存取
无服务器架构不支持旧数据访问模式。 请更新代码以使用 Unity Catalog。
| 经典模式 | 无服务器替代方案 | 详细信息 |
|---|---|---|
DBFS 路径 (dbfs:/...) |
Unity Catalog 卷 | 什么是 Unity Catalog 容量? |
| Hive Metastore 表 | Unity Catalog 表(或 HMS Federation) | 将Azure Databricks工作区更新到 Unity 目录 |
| 存储帐户凭据 | Unity Catalog 的外部位置 | 使用 Unity Catalog 连接到云对象存储 |
| 自定义 JDBC JAR | 湖仓联合体 | 什么是查询联合? |
警告
DBFS 访问在无服务器时受到限制。 在迁移之前更新 Unity 目录卷的所有 dbfs:/ 路径。 有关详细信息,请参阅 迁移存储在 DBFS 中的文件。
示例:替换 DBFS 路径和 Hive 元存储引用
# Classic
df = spark.read.csv("dbfs:/mnt/datalake/data.csv", header=True)
df.write.parquet("dbfs:/mnt/output/results")
df = spark.table("my_database.my_table")
# Serverless
df = spark.read.csv("/Volumes/main/sales/raw_data/data.csv", header=True)
df.write.parquet("/Volumes/main/analytics/output/results")
df = spark.table("main.my_database.my_table") # three-level namespace
API 和代码
某些 API 和代码模式在无服务器环境中不受支持。 引用此表以查看代码是否需要更新。
| 经典模式 | 无服务器替代方案 | 详细信息 |
|---|---|---|
RDD 接口 (sc.parallelize,rdd.map) |
DataFrame API | 将 Spark Connect 与 Spark 经典版进行比较 |
df.cache(), df.persist() |
删除缓存调用 | 无服务器计算限制 |
spark.sparkContext, sqlContext |
直接使用 spark (SparkSession) |
将 Spark Connect 与 Spark 经典版进行比较 |
Hive 变量 (${var}) |
SQL DECLARE VARIABLE 或 Python f-strings |
声明变量 |
| 不支持的 Spark 配置 | 删除不支持的配置。 无服务器自动优化大多数设置。 | 为无服务器笔记本和作业配置 Spark 属性 |
示例:将 RDD 操作替换为数据帧
from pyspark.sql import functions as F
# sc.parallelize + rdd.map
# Classic: rdd = sc.parallelize([1, 2, 3]); rdd.map(lambda x: x * 2).collect()
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
result = df.select((F.col("value") * 2).alias("value")).collect()
# rdd.flatMap
# Classic: sc.parallelize(["hello world"]).flatMap(lambda l: l.split(" ")).collect()
df = spark.createDataFrame([("hello world",)], ["line"])
words = df.select(F.explode(F.split("line", " ")).alias("word")).collect()
# rdd.groupByKey
# Classic: rdd.groupByKey().mapValues(list).collect()
df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3)], ["key", "value"])
grouped = df.groupBy("key").agg(F.collect_list("value").alias("values")).collect()
# rdd.mapPartitions → applyInPandas
import pandas as pd
def process_group(pdf: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({"total": [pdf["id"].sum()]})
result = (spark.range(100).repartition(4)
.groupBy(F.spark_partition_id())
.applyInPandas(process_group, schema="total long").collect())
# sc.textFile → spark.read.text
df = spark.read.text("/Volumes/catalog/schema/volume/file.txt")
示例:替换 SparkContext 和缓存
from pyspark.sql.functions import broadcast
# sc.broadcast → broadcast join
result = main_df.join(broadcast(lookup_df), "key")
# sc.accumulator → DataFrame aggregation
total = df.agg(F.sum("amount")).collect()[0][0]
# sqlContext.sql → spark.sql
result = spark.sql("SELECT * FROM main.db.table")
# df.cache() → remove caching calls
# Materialize expensive intermediate results to Delta as a workaround:
df = spark.read.parquet(path)
result = df.filter("status = 'active'")
expensive_df.write.format("delta").mode("overwrite").saveAsTable("main.scratch.temp")
result = spark.table("main.scratch.temp")
库和环境
利用基础环境在工作区级别管理库和环境,利用笔记本的无服务器环境在笔记本级别管理。
| 经典模式 | 无服务器替代方案 | 详细信息 |
|---|---|---|
| 初始化脚本 | 无服务器环境 | 配置无服务器环境 |
| 集群范围库 | 笔记本范围库或环境库 | 配置无服务器环境 |
| Maven/JAR 库 | JAR 任务对作业的支持; 针对笔记本的 PyPI 支持 | JAR 任务用于作业 |
| Docker 容器 | 适用于库需求的无服务器环境 | 配置无服务器环境 |
在 requirements.txt 中锁定 Python 包,以创建可重现的环境。 请参阅 Specify Python 包版本。
Streaming
无服务器架构支持流式工作负载,但不支持某些类型的触发器。 更新代码以使用支持的触发器。
| Spark 触发器 | 支持 | 注释 |
|---|---|---|
Trigger.AvailableNow() |
是 | 推荐 |
Trigger.Once() |
是 | 此功能已弃用。 请改用 Trigger.AvailableNow()。 |
Trigger.ProcessingTime(interval) |
否 | 返回 INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED |
Trigger.Continuous(interval) |
否 | 改用 Lakeflow Spark 声明性管道连续模式 |
默认值(未设置 .trigger()) |
否 | 省略 .trigger() 会默认使用 ProcessingTime("0 seconds"),但这在无服务器环境中不被支持。 始终显式设置 .trigger(availableNow=True) 。 |
对于连续流式数据处理,请迁移到连续模式下的 Spark 声明性管道,或配合使用连续计划作业AvailableNow。 对于大型源,请设置 maxFilesPerTrigger 或 maxBytesPerTrigger 防止内存不足错误。
示例:修复流触发器
# Classic (not supported on serverless — default trigger is ProcessingTime)
query = df.writeStream.format("delta").outputMode("append").start()
# Serverless (explicit AvailableNow trigger)
query = (df.writeStream.format("delta").outputMode("append")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))
query.awaitTermination()
# With OOM prevention for large sources
query = (spark.readStream.format("delta")
.option("maxFilesPerTrigger", 100)
.option("maxBytesPerTrigger", "10g")
.load(input_path)
.writeStream.format("delta")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))
测试工作负荷
- 快速兼容性测试:使用 标准 访问模式和 Databricks Runtime 14.3 或更高版本在经典计算上运行工作负荷。 如果运行成功,工作负载可以迁移到无服务器,而无需进行任何代码更改。
- A/B 比较 (建议用于生产):在经典(控制)和无服务器(试验)上运行相同的工作负荷。 对比输出表格及验证正确性。 反复进行,直到输出匹配。
- 临时配置:可以在测试期间临时设置支持的 Spark 配置。 一旦稳定,请将其删除。
选择性能模式
无服务器作业和管道支持两种性能模式:标准和性能优化。 选择的性能模式取决于工作负荷要求。
| 模式 | 可用性 | Startup | 最适用于 |
|---|---|---|---|
| 标准 | 任务、Lakeflow Spark 声明式管道 | 4-6 分钟 | 成本敏感批处理 |
| 性能优化型 | 笔记本、工作、Lakeflow Spark 声明式管道 | 秒 | 交互式、延迟敏感 |
分阶段迁移
- 新工作负载:在无服务器上启动所有新的笔记本和作业。
- 低风险工作负荷:迁移已在标准访问模式和 Databricks Runtime 14.3 或更高版本上的 PySpark/SQL 工作负荷。
- 复杂工作负荷:迁移需要代码更改的工作负载(RDD 重写、DBFS 更新、触发器修复)。
- 剩余工作负荷:随着功能扩展而定期查看。
监控成本
无服务器计费基于 DBU 消耗,而不是群集运行时间。 在大规模迁移之前,先验证具有代表性工作负荷的成本预期。 有关监视无服务器成本的工具和策略,请参阅 “监视无服务器计算的成本”。
其他资源
- 无服务器计算的最佳做法:无服务器工作负荷的优化提示
- 无服务器计算限制:当前限制和不支持的功能的完整列表
- 配置无服务器环境:管理库和依赖项
- 支持的 Spark 配置:在无服务器环境中可用的 Spark 配置
- Spark Connect 与经典 Spark:无服务器体系结构中的行为差异
- 无服务器网络安全:NCC、专用链接 和防火墙配置
- 无服务器计算发布说明:跟踪随其发布的新功能
- Unity 目录升级指南:从 Hive 元存储迁移到 Unity 目录
还可以参阅以下博客文章了解详细信息:
- 什么是无服务器计算?:无服务器功能和客户结果概述
- 数据工程的演变:无服务器计算如何改变笔记本和 Lakeflow 作业:无服务器如何为 Lakeflow 作业和流水线提供支持