将旧工作区升级到 Unity 目录时更新作业

旧工作区升级到 Unity 目录时,可能需要更新现有作业以引用升级后的表和文件路径。 下表列出了更新作业的典型方案和建议。

情景 解决方案
作业使用通过初始化脚本或群集定义的库引用自定义库的笔记本。
自定义库将定义为非公开可用的 pip 包或 jar,该包或 jar 执行嵌入在其代码中的 Apache Spark 或 SQL 读取或写入作。
修改自定义库以确保:
  • 数据库名称是三级命名空间。
  • 装入点不在代码中使用。
作业使用读取或写入 Hive 元存储表的笔记本。
  • 评估在作业群集 Spark 配置中设置默认目录: spark.databricks.sql.initial.catalog.name my_catalog
  • 评估工作区默认目录是否可设置为非hive_metastore,因此无需更改作业代码。
  • 否则,请更改作业代码,将两级命名空间重命名为相应表的三级命名空间。
  • 如果作业使用的是纯 SQL,请考虑添加语句 USE CATALOG
作业使用从表子文件夹读取或写入路径的笔记本。 这在 Unity 目录中是不可能的。
  • 更改代码以从表读取分区列的谓词。
  • 更改代码以使用 overwriteByPartition 其他适当选项写入表。
作业使用正在读取或写入到在 Unity 目录中注册的表的装载路径的笔记本
  • 更改代码以引用正确的三级命名空间表。
  • 如果表未注册或不会注册,则仍需要修改代码以写入卷路径,而不是装载路径。
作业使用通过装载路径读取或写入文件(而不是表)的笔记本。 请改为更改代码以写入卷位置。
作业是一个流式处理作业,它使用 applyInPandasWithState 当前不支持。 如果可能,请考虑重写,或在提供支持之前不尝试重构此作业。
作业是使用连续处理模式的流式处理作业。 连续处理模式在 Spark 中是实验性的,在 Unity 目录中不受支持。 重构作业以使用结构化流式处理。 如果无法执行此作,请考虑使作业针对 Hive 元存储运行。
作业是使用检查点目录的流式处理作业。
  • 将检查点目录移动到卷。
  • 更改笔记本中的代码以使用卷路径。
  • 作业所有者应在该路径上具有读写。
  • 停止作业。
  • 将检查点移动到新卷位置。
  • 重启作业。
作业的群集定义低于 Databricks Runtime 11.3。
  • 将作业群集定义更改为 Databricks Runtime 11.3 或更高版本。
  • 更改作业群集定义以使用指定或标准访问模式。
作业具有与存储或表交互的笔记本。 作业运行的服务主体必须提供对 Unity 目录中必需资源的读取和写入访问权限,例如卷、表、外部位置等。
作业是 Lakeflow 声明性管道。
  • 将作业群集更改为 Databricks Runtime 13.1 或更高版本。
  • 停止 Lakeflow 声明性管道作业。
  • 将数据移动到 Unity 目录托管表。
  • 将 Lakeflow 声明性管道作业定义更改为使用新的 Unity 目录托管表。
  • 重启 Lakeflow 声明性管道作业。
作业具有使用非存储云服务(如 AWSKinesis)的笔记本,以及用于连接的配置使用实例配置文件。
  • 修改代码以使用 Unity 目录服务凭据,该凭据通过生成 SDK 可使用的临时凭据来管理能够与非存储云服务交互的凭据。
作业使用 Scala
  • 如果低于 Databricks Runtime 13.3,请在专用计算上运行。
  • Databricks Runtime 13.3 及更高版本支持标准群集。
作业具有使用 Scala UDF 的笔记本
  • 如果低于 Databricks Runtime 13.3,请在专用计算上运行。
  • Databricks Runtime 14.2 支持标准群集。
作业具有使用 MLR 的任务 在专用计算上运行。
作业具有依赖于全局初始化脚本的群集配置。
  • 使用 Databricks Runtime 13.3 或更高版本进行完全支持。
  • 修改为使用群集范围的 init 脚本或使用群集策略。 脚本、文件和包必须安装在 Unity 目录卷上才能运行。
作业具有使用 jars/Maven、Spark 扩展或自定义数据源(从 Spark)的群集配置或笔记本。
  • 使用 Databricks Runtime 13.3 或更高版本。
  • 使用群集策略安装库。
作业具有使用 PySpark UDF 的笔记本。 使用 Databricks Runtime 13.2 或更高版本。
作业具有具有进行网络调用的 Python 代码的笔记本。 使用 Databricks Runtime 12.2 或更高版本。
作业具有使用 Pandas UDF(标量)的笔记本。 使用 Databricks Runtime 13.2 或更高版本。
作业将使用 Unity 目录卷。 使用 Databricks Runtime 13.3 或更高版本。
作业具有使用 spark.catalog.XtableExistslistTablessetDefaultCatalog)并使用共享群集运行的笔记本
  • 使用 Databricks Runtime 14.2 或更高版本。
  • 如果无法进行 Databricks Runtime 升级,请使用以下步骤:
    tableExists请改用以下代码:
    # SQL workaround
    def tableExistsSql(tablename):
    try:
    spark.sql(f"DESCRIBE TABLE {tablename};")
    except Exception as e:
    return False
    return True
    tableExistsSql("jakob.jakob.my_table")
    而不是 listTables,使用 SHOW TABLES (允许限制数据库或模式匹配):( 允许限制数据库或模式匹配):
    spark.sql("SHOW TABLES")
    对于 setDefaultCatalog 运行
    spark.sql("USE CATALOG ")
作业具有使用内部 DButils API 的笔记本:命令上下文,并使用共享群集运行。
尝试访问命令访问权限的工作负荷,例如,使用 检索作业 ID
dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
使用 .toJson() 而非 .safeToJson()。 这会提供可在共享群集上安全共享的所有命令上下文信息的子集。
需要 Databricks Runtime 13.3 LTS+
作业具有使用 PySpark 的笔记本:spark.udf.registerJavaFunction 并使用共享群集运行
  • 使用 Databricks Runtime 14.3 LTS 或更高版本
  • 对于笔记本和作业,请使用 %scala 单元格通过 spark.udf.register 注册 Scala UDF。 Python 和 Scala 共享执行上下文时,Scala UDF 也将从 Python 获取。
  • 对于使用 IDE(使用 Databricks Connect v2)的客户,唯一的选择是将 UDF 重写为 Unity 目录 Python UDF。 将来,我们计划扩展对 Unity 目录 UDF 的支持以支持 Scala。
作业具有使用 RDD 的笔记本:sc.parallelize 和 spark.read.json() 将 JSON 对象转换为 DF 并使用共享群集运行
  • 改用 json.loads

例-
之前:
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)
之后:
from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()
作业具有使用 RDD 的笔记本:通过 sc.emptyRDD() 使用空数据帧并使用共享群集运行 例-
之前:
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)
之后:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)
作业具有使用 RDD 的笔记本:mapPartitions(成本高昂的初始化逻辑 + 每行更便宜的作),并使用共享群集运行
  • 原因-
    使用 Spark Connect 进行 Python/Scala 程序与 Spark Server 之间的通信的 Unity 目录共享群集,使 RDD 不再可访问。
    之前:
    RDD 的典型用例是只执行一次昂贵的初始化逻辑,然后每行执行更便宜的作。 此类用例可以调用外部服务或初始化加密逻辑。
    之后:
    使用数据帧 API 和 PySpark 本机箭头 UDF 重写 RDD作。
作业具有使用 SparkContext (sc) 和 sqlContext 的笔记本,并使用共享群集运行
  • 原因-
    由于 Unity 目录共享群集体系结构和 SparkConnect,Spark 上下文 (sc) 和 sqlContext 的设计不可用。
    如何解决:
    使用 spark 变量与 SparkSession 实例交互
    限制:
    只能通过 Spark 命令直接从 Python/Scala REPL 访问 Spark JVM。 这意味着sc._jvm命令的设计将失败。
    不支持以下 sc 命令:emptyRDD、 range, init_batched_serializer, parallelize, pickleFile, textFile, wholeTextFiles, binaryFiles, binaryRecords, sequenceFile, newAPIHadoopFIle, newAPIHadoopRDD, hadoopFile, hadoopRDD, union, runJob, setSystemProperty, uiWebUrl, stop, setJobGroup, setLocalProperty, getConf
作业具有使用 Spark Conf 的笔记本 - sparkContext.getConf 并使用共享群集运行
  • 原因-
    sparkContext、df.sparkContext、sc.sparkContext 和类似的 API 在设计上不可用。
    如何解决:
    请改用 spark.conf
作业具有使用 SparkContext - SetJobDescription() 的笔记本,并使用共享群集运行
  • 原因-
    sc.setJobDescription(“String”)由于 Unity 目录共享群集体系结构和 SparkConnect,设计上不可用。
    如何解决:
    如果可能,请改用标记 [PySpark 文档]
    spark.addTag() 可以附加标记,getTags() 和 interruptTag(tag) 可用于在标记存在/不存在时执行作
    需要 Databricks Runtime 14.1+
作业具有使用命令(如 sc.setLogLevel(“INFO”)设置 Spark 日志级别的笔记本,并使用共享群集运行
  • 原因-
    在 Single-User 且没有隔离群集中,可以直接访问 Spark 上下文以动态设置驱动程序和执行程序之间的日志级别。 在共享群集上,无法从 Spark 上下文访问此方法,在 Databricks Runtime 14+ 中,Spark 上下文不再可用。
    如何解决:
    若要在不提供 log4j.conf 的情况下控制日志级别,现在可以在群集设置中使用 Spark 配置值。 通过将 spark.log.level 设置为 DEBUG、WARN、INFO、ERROR 作为群集设置中的 Spark 配置值来使用 Spark 日志级别。
作业具有使用深度嵌套表达式/查询并使用共享群集运行的笔记本
  • 原因-
    RecursionError / Protobuf 最大嵌套级别已超出(对于深度嵌套表达式/查询)
    使用 PySpark 数据帧 API 以递归方式创建深度嵌套的数据帧和表达式时,在某些情况下,可能会出现以下任一情况:
    • Python 异常:RecursionError:超出最大递归深度
    • SparkConnectGprcException:Protobuf 超出最大嵌套级别

    如何解决:
    若要规避该问题,请识别深层嵌套的代码路径,并使用线性表达式/子查询或临时视图重写它们。
    例如:而不是以递归方式调用 df.withColumn,而是改为调用 df.withColumns(dict)。
作业具有在代码中使用 input_file_name() 的笔记本,并使用共享群集运行
  • 原因-
    共享群集的 Unity 目录中不支持 input_file_name()。
    如何解决:
    获取文件名
    .withColumn("RECORD_FILE_NAME", col("_metadata.file_name"))
    适用于 spark.read
    获取整个文件路径
    .withColumn("RECORD_FILE_PATH", col("_metadata.file_path"))
    适用于 spark.read
作业具有对 DBFS 文件系统执行数据作并使用共享群集运行的笔记本
  • 原因-
    使用 FUSE 服务将 DBFS 与共享群集配合使用时,它无法访问文件系统并生成找不到文件错误
    示例:
    下面是使用对 DBFS 的共享群集访问失败时的一些示例
    with open('/dbfs/test/sample_file.csv', 'r') as file:
    ls -ltr /dbfs/test
    cat /dbfs/test/sample_file.csv
    如何解决:
    任一使用 -
    • Databricks Unity 目录卷而不是使用 DBFS (首选)
    • 更新代码以使用通过直接访问存储访问路径的 dbutils 或 spark,并被授予从共享群集访问 DBFS 的权限