作业使用通过初始化脚本或群集定义的库引用自定义库的笔记本。 自定义库将定义为非公开可用的 pip 包或 jar,该包或 jar 执行嵌入在其代码中的 Apache Spark 或 SQL 读取或写入作。 |
修改自定义库以确保:
|
作业使用读取或写入 Hive 元存储表的笔记本。 |
- 评估在作业群集 Spark 配置中设置默认目录:
spark.databricks.sql.initial.catalog.name my_catalog - 评估工作区默认目录是否可设置为非hive_metastore,因此无需更改作业代码。
- 否则,请更改作业代码,将两级命名空间重命名为相应表的三级命名空间。
- 如果作业使用的是纯 SQL,请考虑添加语句
USE CATALOG 。
|
作业使用从表子文件夹读取或写入路径的笔记本。 这在 Unity 目录中是不可能的。 |
- 更改代码以从表读取分区列的谓词。
- 更改代码以使用
overwriteByPartition 其他适当选项写入表。
|
作业使用正在读取或写入到在 Unity 目录中注册的表的装载路径的笔记本 |
- 更改代码以引用正确的三级命名空间表。
- 如果表未注册或不会注册,则仍需要修改代码以写入卷路径,而不是装载路径。
|
作业使用通过装载路径读取或写入文件(而不是表)的笔记本。 |
请改为更改代码以写入卷位置。 |
作业是一个流式处理作业,它使用 applyInPandasWithState 。 |
当前不支持。 如果可能,请考虑重写,或在提供支持之前不尝试重构此作业。 |
作业是使用连续处理模式的流式处理作业。 |
连续处理模式在 Spark 中是实验性的,在 Unity 目录中不受支持。 重构作业以使用结构化流式处理。 如果无法执行此作,请考虑使作业针对 Hive 元存储运行。 |
作业是使用检查点目录的流式处理作业。 |
- 将检查点目录移动到卷。
- 更改笔记本中的代码以使用卷路径。
- 作业所有者应在该路径上具有读写。
- 停止作业。
- 将检查点移动到新卷位置。
- 重启作业。
|
作业的群集定义低于 Databricks Runtime 11.3。 |
- 将作业群集定义更改为 Databricks Runtime 11.3 或更高版本。
- 更改作业群集定义以使用指定或标准访问模式。
|
作业具有与存储或表交互的笔记本。 |
作业运行的服务主体必须提供对 Unity 目录中必需资源的读取和写入访问权限,例如卷、表、外部位置等。 |
作业是 Lakeflow 声明性管道。 |
- 将作业群集更改为 Databricks Runtime 13.1 或更高版本。
- 停止 Lakeflow 声明性管道作业。
- 将数据移动到 Unity 目录托管表。
- 将 Lakeflow 声明性管道作业定义更改为使用新的 Unity 目录托管表。
- 重启 Lakeflow 声明性管道作业。
|
作业具有使用非存储云服务(如 AWSKinesis)的笔记本,以及用于连接的配置使用实例配置文件。 |
- 修改代码以使用 Unity 目录服务凭据,该凭据通过生成 SDK 可使用的临时凭据来管理能够与非存储云服务交互的凭据。
|
作业使用 Scala |
- 如果低于 Databricks Runtime 13.3,请在专用计算上运行。
- Databricks Runtime 13.3 及更高版本支持标准群集。
|
作业具有使用 Scala UDF 的笔记本 |
- 如果低于 Databricks Runtime 13.3,请在专用计算上运行。
- Databricks Runtime 14.2 支持标准群集。
|
作业具有使用 MLR 的任务 |
在专用计算上运行。 |
作业具有依赖于全局初始化脚本的群集配置。 |
- 使用 Databricks Runtime 13.3 或更高版本进行完全支持。
- 修改为使用群集范围的 init 脚本或使用群集策略。 脚本、文件和包必须安装在 Unity 目录卷上才能运行。
|
作业具有使用 jars/Maven、Spark 扩展或自定义数据源(从 Spark)的群集配置或笔记本。 |
- 使用 Databricks Runtime 13.3 或更高版本。
- 使用群集策略安装库。
|
作业具有使用 PySpark UDF 的笔记本。 |
使用 Databricks Runtime 13.2 或更高版本。 |
作业具有具有进行网络调用的 Python 代码的笔记本。 |
使用 Databricks Runtime 12.2 或更高版本。 |
作业具有使用 Pandas UDF(标量)的笔记本。 |
使用 Databricks Runtime 13.2 或更高版本。 |
作业将使用 Unity 目录卷。 |
使用 Databricks Runtime 13.3 或更高版本。 |
作业具有使用 spark.catalog.X (tableExists 、 listTables 、 setDefaultCatalog )并使用共享群集运行的笔记本 |
- 使用 Databricks Runtime 14.2 或更高版本。
- 如果无法进行 Databricks Runtime 升级,请使用以下步骤:
tableExists 请改用以下代码:
# SQL workaround
def tableExistsSql(tablename):
try:
spark.sql(f"DESCRIBE TABLE {tablename};")
except Exception as e:
return False
return True
tableExistsSql("jakob.jakob.my_table") 而不是 listTables ,使用 SHOW TABLES (允许限制数据库或模式匹配):( 允许限制数据库或模式匹配):
spark.sql("SHOW TABLES") 对于 setDefaultCatalog 运行
spark.sql("USE CATALOG ")
|
作业具有使用内部 DButils API 的笔记本:命令上下文,并使用共享群集运行。 尝试访问命令访问权限的工作负荷,例如,使用 检索作业 ID
dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson() |
使用 .toJson() 而非 .safeToJson() 。 这会提供可在共享群集上安全共享的所有命令上下文信息的子集。 需要 Databricks Runtime 13.3 LTS+ |
作业具有使用 PySpark 的笔记本:spark.udf.registerJavaFunction 并使用共享群集运行 |
- 使用 Databricks Runtime 14.3 LTS 或更高版本
- 对于笔记本和作业,请使用 %scala 单元格通过 spark.udf.register 注册 Scala UDF。 Python 和 Scala 共享执行上下文时,Scala UDF 也将从 Python 获取。
- 对于使用 IDE(使用 Databricks Connect v2)的客户,唯一的选择是将 UDF 重写为 Unity 目录 Python UDF。 将来,我们计划扩展对 Unity 目录 UDF 的支持以支持 Scala。
|
作业具有使用 RDD 的笔记本:sc.parallelize 和 spark.read.json() 将 JSON 对象转换为 DF 并使用共享群集运行 |
例-
之前:
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)
之后:
from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display() |
作业具有使用 RDD 的笔记本:通过 sc.emptyRDD() 使用空数据帧并使用共享群集运行 |
例-
之前:
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)
之后:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema) |
作业具有使用 RDD 的笔记本:mapPartitions(成本高昂的初始化逻辑 + 每行更便宜的作),并使用共享群集运行 |
- 原因-
使用 Spark Connect 进行 Python/Scala 程序与 Spark Server 之间的通信的 Unity 目录共享群集,使 RDD 不再可访问。
之前: RDD 的典型用例是只执行一次昂贵的初始化逻辑,然后每行执行更便宜的作。 此类用例可以调用外部服务或初始化加密逻辑。
之后: 使用数据帧 API 和 PySpark 本机箭头 UDF 重写 RDD作。
|
作业具有使用 SparkContext (sc) 和 sqlContext 的笔记本,并使用共享群集运行 |
- 原因-
由于 Unity 目录共享群集体系结构和 SparkConnect,Spark 上下文 (sc) 和 sqlContext 的设计不可用。
如何解决: 使用 spark 变量与 SparkSession 实例交互 限制: 只能通过 Spark 命令直接从 Python/Scala REPL 访问 Spark JVM。 这意味着sc._jvm命令的设计将失败。 不支持以下 sc 命令:emptyRDD、 range, init_batched_serializer, parallelize, pickleFile, textFile, wholeTextFiles, binaryFiles, binaryRecords, sequenceFile, newAPIHadoopFIle, newAPIHadoopRDD, hadoopFile, hadoopRDD, union, runJob, setSystemProperty, uiWebUrl, stop, setJobGroup, setLocalProperty, getConf
|
作业具有使用 Spark Conf 的笔记本 - sparkContext.getConf 并使用共享群集运行 |
- 原因-
sparkContext、df.sparkContext、sc.sparkContext 和类似的 API 在设计上不可用。
如何解决: 请改用 spark.conf
|
作业具有使用 SparkContext - SetJobDescription() 的笔记本,并使用共享群集运行 |
- 原因-
sc.setJobDescription(“String”)由于 Unity 目录共享群集体系结构和 SparkConnect,设计上不可用。
如何解决: 如果可能,请改用标记 [PySpark 文档] spark.addTag() 可以附加标记,getTags() 和 interruptTag(tag) 可用于在标记存在/不存在时执行作 需要 Databricks Runtime 14.1+
|
作业具有使用命令(如 sc.setLogLevel(“INFO”)设置 Spark 日志级别的笔记本,并使用共享群集运行 |
- 原因-
在 Single-User 且没有隔离群集中,可以直接访问 Spark 上下文以动态设置驱动程序和执行程序之间的日志级别。 在共享群集上,无法从 Spark 上下文访问此方法,在 Databricks Runtime 14+ 中,Spark 上下文不再可用。
如何解决: 若要在不提供 log4j.conf 的情况下控制日志级别,现在可以在群集设置中使用 Spark 配置值。 通过将 spark.log.level 设置为 DEBUG、WARN、INFO、ERROR 作为群集设置中的 Spark 配置值来使用 Spark 日志级别。
|
作业具有使用深度嵌套表达式/查询并使用共享群集运行的笔记本 |
- 原因-
RecursionError / Protobuf 最大嵌套级别已超出(对于深度嵌套表达式/查询) 使用 PySpark 数据帧 API 以递归方式创建深度嵌套的数据帧和表达式时,在某些情况下,可能会出现以下任一情况:
- Python 异常:RecursionError:超出最大递归深度
- SparkConnectGprcException:Protobuf 超出最大嵌套级别
如何解决: 若要规避该问题,请识别深层嵌套的代码路径,并使用线性表达式/子查询或临时视图重写它们。 例如:而不是以递归方式调用 df.withColumn,而是改为调用 df.withColumns(dict)。
|
作业具有在代码中使用 input_file_name() 的笔记本,并使用共享群集运行 |
- 原因-
共享群集的 Unity 目录中不支持 input_file_name()。
如何解决: 获取文件名
.withColumn("RECORD_FILE_NAME", col("_metadata.file_name")) 适用于 spark.read 获取整个文件路径
.withColumn("RECORD_FILE_PATH", col("_metadata.file_path")) 适用于 spark.read
|
作业具有对 DBFS 文件系统执行数据作并使用共享群集运行的笔记本 |
- 原因-
使用 FUSE 服务将 DBFS 与共享群集配合使用时,它无法访问文件系统并生成找不到文件错误
示例: 下面是使用对 DBFS 的共享群集访问失败时的一些示例
with open('/dbfs/test/sample_file.csv', 'r') as file:
ls -ltr /dbfs/test
cat /dbfs/test/sample_file.csv
如何解决: 任一使用 -
- Databricks Unity 目录卷而不是使用 DBFS (首选)
- 更新代码以使用通过直接访问存储访问路径的 dbutils 或 spark,并被授予从共享群集访问 DBFS 的权限
|