从经典计算迁移到无服务器计算

将工作负荷从经典计算迁移到无服务器计算。 无服务器计算会自动处理预配、缩放、运行时升级和优化。

大多数经典工作负荷只需进行最少或没有代码更改即可迁移。 本页重点介绍这些工作负载。 某些功能(例如 df.cache,在无服务器)上尚不受支持,但一旦可用,就不需要更改代码。 依赖于 R 或 Scala 笔记本的某些工作负荷需要经典计算,并且无法迁移到无服务器。 有关当前限制的完整列表,请参阅 无服务器计算限制

迁移步骤

若要将工作负荷从经典计算迁移到无服务器计算,请执行以下步骤:

  1. 检查先决条件:验证工作区、网络和云存储访问是否符合要求。 请参阅准备工作
  2. 更新代码:进行任何必要的代码和配置更改。 请参阅 “更新代码”。
  3. 测试工作负载:在切换之前验证兼容性和正确性。 请参阅 “测试工作负荷”。
  4. 选择性能模式:选择最符合工作负荷要求的性能模式。 请参阅 “选择性能模式”。
  5. 分阶段迁移:从新的低风险工作负载开始,以增量方式推出无服务器工作负荷。 请参阅 分阶段迁移
  6. 监视成本:跟踪无服务器 DBU 消耗并设置警报。 请参阅 “监视成本”。

在您开始之前

在开始迁移之前,可能需要更新工作区中的某些旧配置。

先决条件 Action 详细信息
启用 Unity Catalog 工作区 如有需要,从 Hive Metastore 迁移 将Azure Databricks工作区更新到 Unity 目录
已配置网络 将 VPC 对等连接 (VPC peering) 替换为网络控制中心 (NCC)、专用链接 或防火墙规则 无服务器计算平面网络
云存储访问 将旧数据访问模式替换为 Unity 目录外部位置 使用 Unity Catalog 连接到云对象存储

确认工作区位于 受支持的区域中

更新代码

以下部分列出了使工作负载与无服务器兼容的代码和配置更改。

数据存取

无服务器架构不支持旧数据访问模式。 请更新代码以使用 Unity Catalog。

经典模式 无服务器替代方案 详细信息
DBFS 路径 (dbfs:/... Unity Catalog 卷 什么是 Unity Catalog 容量?
Hive Metastore 表 Unity Catalog 表(或 HMS Federation) 将Azure Databricks工作区更新到 Unity 目录
存储帐户凭据 Unity Catalog 的外部位置 使用 Unity Catalog 连接到云对象存储
自定义 JDBC JAR 湖仓联合体 什么是查询联合?

警告

DBFS 访问在无服务器时受到限制。 在迁移之前更新 Unity 目录卷的所有 dbfs:/ 路径。 有关详细信息,请参阅 迁移存储在 DBFS 中的文件

示例:替换 DBFS 路径和 Hive 元存储引用
# Classic
df = spark.read.csv("dbfs:/mnt/datalake/data.csv", header=True)
df.write.parquet("dbfs:/mnt/output/results")
df = spark.table("my_database.my_table")

# Serverless
df = spark.read.csv("/Volumes/main/sales/raw_data/data.csv", header=True)
df.write.parquet("/Volumes/main/analytics/output/results")
df = spark.table("main.my_database.my_table")  # three-level namespace

API 和代码

某些 API 和代码模式在无服务器环境中不受支持。 引用此表以查看代码是否需要更新。

经典模式 无服务器替代方案 详细信息
RDD 接口 (sc.parallelizerdd.map DataFrame API 将 Spark Connect 与 Spark 经典版进行比较
df.cache(), df.persist() 删除缓存调用 无服务器计算限制
spark.sparkContext, sqlContext 直接使用 spark (SparkSession) 将 Spark Connect 与 Spark 经典版进行比较
Hive 变量 (${var} SQL DECLARE VARIABLE 或 Python f-strings 声明变量
不支持的 Spark 配置 删除不支持的配置。 无服务器自动优化大多数设置。 为无服务器笔记本和作业配置 Spark 属性
示例:将 RDD 操作替换为数据帧
from pyspark.sql import functions as F

# sc.parallelize + rdd.map
# Classic:  rdd = sc.parallelize([1, 2, 3]); rdd.map(lambda x: x * 2).collect()
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
result = df.select((F.col("value") * 2).alias("value")).collect()

# rdd.flatMap
# Classic:  sc.parallelize(["hello world"]).flatMap(lambda l: l.split(" ")).collect()
df = spark.createDataFrame([("hello world",)], ["line"])
words = df.select(F.explode(F.split("line", " ")).alias("word")).collect()

# rdd.groupByKey
# Classic:  rdd.groupByKey().mapValues(list).collect()
df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3)], ["key", "value"])
grouped = df.groupBy("key").agg(F.collect_list("value").alias("values")).collect()

# rdd.mapPartitions → applyInPandas
import pandas as pd
def process_group(pdf: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame({"total": [pdf["id"].sum()]})
result = (spark.range(100).repartition(4)
    .groupBy(F.spark_partition_id())
    .applyInPandas(process_group, schema="total long").collect())

# sc.textFile → spark.read.text
df = spark.read.text("/Volumes/catalog/schema/volume/file.txt")
示例:替换 SparkContext 和缓存
from pyspark.sql.functions import broadcast

# sc.broadcast → broadcast join
result = main_df.join(broadcast(lookup_df), "key")

# sc.accumulator → DataFrame aggregation
total = df.agg(F.sum("amount")).collect()[0][0]

# sqlContext.sql → spark.sql
result = spark.sql("SELECT * FROM main.db.table")

# df.cache() → remove caching calls
# Materialize expensive intermediate results to Delta as a workaround:
df = spark.read.parquet(path)
result = df.filter("status = 'active'")
expensive_df.write.format("delta").mode("overwrite").saveAsTable("main.scratch.temp")
result = spark.table("main.scratch.temp")

库和环境

利用基础环境在工作区级别管理库和环境,利用笔记本的无服务器环境在笔记本级别管理。

经典模式 无服务器替代方案 详细信息
初始化脚本 无服务器环境 配置无服务器环境
集群范围库 笔记本范围库或环境库 配置无服务器环境
Maven/JAR 库 JAR 任务对作业的支持; 针对笔记本的 PyPI 支持 JAR 任务用于作业
Docker 容器 适用于库需求的无服务器环境 配置无服务器环境

requirements.txt 中锁定 Python 包,以创建可重现的环境。 请参阅 Specify Python 包版本

Streaming

无服务器架构支持流式工作负载,但不支持某些类型的触发器。 更新代码以使用支持的触发器。

Spark 触发器 支持 注释
Trigger.AvailableNow() 推荐
Trigger.Once() 此功能已弃用。 请改用 Trigger.AvailableNow()
Trigger.ProcessingTime(interval) 返回 INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED
Trigger.Continuous(interval) 改用 Lakeflow Spark 声明性管道连续模式
默认值(未设置 .trigger() 省略 .trigger() 会默认使用 ProcessingTime("0 seconds"),但这在无服务器环境中不被支持。 始终显式设置 .trigger(availableNow=True)

对于连续流式数据处理,请迁移到连续模式下的 Spark 声明性管道,或配合使用连续计划作业AvailableNow。 对于大型源,请设置 maxFilesPerTriggermaxBytesPerTrigger 防止内存不足错误。

示例:修复流触发器
# Classic (not supported on serverless — default trigger is ProcessingTime)
query = df.writeStream.format("delta").outputMode("append").start()

# Serverless (explicit AvailableNow trigger)
query = (df.writeStream.format("delta").outputMode("append")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .start(output_path))
query.awaitTermination()

# With OOM prevention for large sources
query = (spark.readStream.format("delta")
    .option("maxFilesPerTrigger", 100)
    .option("maxBytesPerTrigger", "10g")
    .load(input_path)
    .writeStream.format("delta")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .start(output_path))

测试工作负荷

  1. 快速兼容性测试:使用 标准 访问模式和 Databricks Runtime 14.3 或更高版本在经典计算上运行工作负荷。 如果运行成功,工作负载可以迁移到无服务器,而无需进行任何代码更改。
  2. A/B 比较 (建议用于生产):在经典(控制)和无服务器(试验)上运行相同的工作负荷。 对比输出表格及验证正确性。 反复进行,直到输出匹配。
  3. 临时配置:可以在测试期间临时设置支持的 Spark 配置。 一旦稳定,请将其删除。

选择性能模式

无服务器作业和管道支持两种性能模式:标准和性能优化。 选择的性能模式取决于工作负荷要求。

模式 可用性 Startup 最适用于
标准 任务、Lakeflow Spark 声明式管道 4-6 分钟 成本敏感批处理
性能优化型 笔记本、工作、Lakeflow Spark 声明式管道 交互式、延迟敏感

分阶段迁移

  1. 新工作负载:在无服务器上启动所有新的笔记本和作业。
  2. 低风险工作负荷:迁移已在标准访问模式和 Databricks Runtime 14.3 或更高版本上的 PySpark/SQL 工作负荷。
  3. 复杂工作负荷:迁移需要代码更改的工作负载(RDD 重写、DBFS 更新、触发器修复)。
  4. 剩余工作负荷:随着功能扩展而定期查看。

监控成本

无服务器计费基于 DBU 消耗,而不是群集运行时间。 在大规模迁移之前,先验证具有代表性工作负荷的成本预期。 有关监视无服务器成本的工具和策略,请参阅 “监视无服务器计算的成本”。

其他资源

还可以参阅以下博客文章了解详细信息: