Spark API 选项参考

此页面列出了用于读取和写入数据的 Spark API 的可用输入和输出选项。

DataFrameReader 选项

将这些选项用于 DataFrameReader.option()DataFrameReader.options()read_filesCOPY INTOAuto Loader 来控制Azure Databricks读取数据文件的方式。

Example

以下示例设置为multiLineTrue读取 JSON 文件:

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)

Common

以下选项适用于所有文件格式。

密钥 默认 有效值 说明
ignoreCorruptFiles false truefalse 是否忽略损坏的文件。 如果为 true,则当遇到损坏的文件时,Spark 作业将继续运行,并且仍会返回已读取的内容。 因此COPY INTO,可以观察已跳过的文件,numSkippedCorruptFilesoperationMetrics Delta Lake 历史记录列中所示。 在 Databricks Runtime 11.3 LTS 及更高版本中可用。
ignoreMissingFiles false 用于自动加载程序( trueCOPY INTO 旧版) truefalse 是否忽略缺少的文件。 如果为 true,则 Spark 作业在遇到缺少的文件且仍返回内容时继续运行。 在 Databricks Runtime 11.3 LTS 及更高版本中可用。
modifiedAfter None 时间戳字符串 可选时间戳作为筛选器,用于仅引入在指定时间戳之后具有修改时间戳的文件。
modifiedBefore None 时间戳字符串 可选时间戳作为筛选器,用于仅引入在指定时间戳之前具有修改时间戳的文件。
pathGlobFilterfileNamePattern None glob 模式字符串 选择文件的潜在 glob 模式。 等效于 PATTERNCOPY INTO 旧版)。 fileNamePattern 可在 read_files 中使用。
recursiveFileLookup false truefalse 当此选项搜索嵌套目录时 true,即使它们的名称不遵循分区命名方案(例如 date=2019-07-01)。

Avro

读取 Avro 文件时,以下选项适用。

密钥 默认 有效值 说明
avroSchema None Avro 架构字符串 用户以 Avro 格式指定的可选架构。 读取 Avro 时,此选项可以设置为兼容但不同于实际 Avro 架构的不断发展架构。 反序列化架构与不断发展的架构一致。 例如,如果设置一个具有默认值的其他列的演变架构,则读取结果也包含新列。
avroSchemaEvolutionMode none nonerestart 如何使用架构注册表处理架构演变。 none 忽略架构更改并继续执行作业。 restart UnknownFieldException在检测到架构更改并要求重启作业时引发架构更改。
datetimeRebaseMode LEGACY EXCEPTIONLEGACYCORRECTED 控制 DATE 和 TIMESTAMP 值在儒略历与外推格里历之间的基本值重定。
enableStableIdentifiersForUnionType false truefalse 是否对 Avro Union 类型使用稳定的字段名称。 启用后,联合类型字段名称派生自其小写类型名称(例如, member_intmember_string。 如果两个类型名称在下限后相同,则引发异常。
mergeSchema false truefalse 是否在多个文件中推断模式并合并每个文件的模式。 Avro 的 mergeSchema 不放宽数据类型。
mode FAILFAST FAILFASTPERMISSIVEDROPMALFORMED 用于处理损坏记录的分析器模式。 FAILFAST 引发异常。 PERMISSIVE 将格式不正确的字段设置为 null。 DROPMALFORMED 静默删除错误的记录。
readerCaseSensitive true truefalse 指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。
recursiveFieldMaxDepth None 015 递归 Avro 字段的最大递归深度。 设置为1截断所有递归字段,2以允许一级递归等。15 取消设置或 0不允许递归字段。
rescuedDataColumn None 列名字符串 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。
COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。
有关更多详细信息,请参考什么是已恢复的数据列?
stableIdentifierPrefixForUnionType member_ 任意字符串 用于稳定联合类型字段名称的 enableStableIdentifiersForUnionType=true前缀。

CSV

读取 CSV 文件时,以下选项适用。

密钥 默认 有效值 说明
badRecordsPath None 路径字符串 用于记录错误 CSV 记录信息的文件存储路径。
charToEscapeQuoteEscaping \0 单个字符 用来对引号的转义字符进行转义的字符。 例如,对于以下记录:[ " a\\", b ]
  • 如果未定义用来对 '\' 进行转义的字符,则不会分析记录。 分析程序会将字符读取为 [a],[\],["],[,],[ ],[b],并引发错误,因为它找不到右引号。
  • 如果将转义 '\' 的字符定义为 '\',则记录将以 2 个值读取:[a\][b]
columnNameOfCorruptRecord _corrupt_record 列名字符串 支持自动加载程序。 不支持 COPY INTO(旧版)。
用于存储因格式不正确而无法分析的记录的列。 如果用于分析的 mode 设置为 DROPMALFORMED,则此列将为空。
comment \0 单个字符 定义表示行注释的字符(位于文本行的开头时)。 请使用 '\0' 来禁用注释跳过。
dateFormat yyyy-MM-dd 日期格式字符串 用于分析日期字符串的格式。
emptyValue 空字符串 任意字符串 空值的字符串表示形式。
enableDateTimeParsingFallback false truefalse 当不能使用指定格式分析值时,是否回退到旧日期和时间时间戳分析行为。 当 false,分析失败时会引发错误或生成 null, mode具体取决于。
encodingcharset UTF-8 名称java.nio.charset.Charset CSV 文件编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset。 当 UTF-16UTF-32 时,不能使用 multilinetrue
enforceSchema true truefalse 是否将指定的或推理出的架构强制应用于 CSV 文件。 如果启用此选项,则会忽略 CSV 文件的标题。 默认情况下,当使用自动加载程序来补救数据并允许架构演变时,会忽略此选项。
escape \ 单个字符 分析数据时要使用的转义字符。
extension csv 文件扩展名字符串 读取的预期文件扩展名。 不带此扩展名的文件将被筛选掉。
failOnUnknownFields false truefalse CSV 记录是否包含架构中不存在的列时是否失败。 何时 false,无法识别的列将静默删除或救援, rescuedDataColumn具体取决于。
failOnWidenedFields false truefalse 如果字段值在未扩大的情况下无法解析为声明的架构类型,则是否失败。 当 false,类型扩大的值会根据情况 rescuedDataColumn以无提示方式进行救援。 设置 failOnUnknownFields=true 可以屏蔽此选项的效果。
header false truefalse CSV 文件是否包含标题。 自动加载程序在推理架构时会假定文件具有标题。
ignoreLeadingWhiteSpace false truefalse 是否忽略每个所分析值的前导空格。
ignoreTrailingWhiteSpace false truefalse 是否忽略每个解析值的尾随空格。
inferSchema false truefalse 是推断所解析的 CSV 记录的数据类型,还是假定所有列都是 StringType 类型的。 如果设置为 true,则需要对数据进行另一轮操作。 对于自动加载程序,请改用 cloudFiles.inferColumnTypes
inputBufferSize 1048576 (1 MB) 正整数 CSV 分析程序缓冲区大小(以字节为单位)。 用于在分析大型 CSV 文件时优化内存使用情况。
lineSep 无,涵盖 \r\r\n\n 一个字符串 两个连续 CSV 记录之间的字符串。
locale US 标识符java.util.Locale 标识Java区域设置,影响 CSV 中的默认日期、时间戳和十进制分析。
maxCharsPerColumn -1 正整数或 -1 无限制 预期要解析的值的最大字符数。 可用于避免内存错误。 默认为 -1,表示无限制。
maxColumns 20480 正整数 记录可以包含的列数的硬限制。
mergeSchema false truefalse 是否在多个文件中推断模式并合并每个文件的模式。 已默认在推理架构时为自动加载程序启用。
mode PERMISSIVE PERMISSIVEDROPMALFORMEDFAILFAST 围绕处理格式错误的记录提供的分析程序模式。
multiLine false truefalse CSV 记录是否跨多行。
nanValue NaN 任意字符串 分析 FloatTypeDoubleType 列时非数字值的字符串表示形式。
negativeInf -Inf 任意字符串 分析 FloatTypeDoubleType 列时负无穷大的字符串表示形式。
nullValue 空字符串 任意字符串 null 值的字符串表示形式。
parserCaseSensitive(已弃用) false truefalse 读取文件时,将标题中声明的列与架构对齐时是否区分大小写。 对于自动加载程序,此项默认为 true。 如果启用,则会在 rescuedDataColumn 中补救大小写不同的列。 出于对 readerCaseSensitive 的偏好,已不推荐使用此选项。
positiveInf Inf 任意字符串 分析 FloatTypeDoubleType 列时正无穷大的字符串表示形式。
preferDate true truefalse 如果可能,尝试将字符串推断为日期而不是时间戳。 此外,还必须通过启用 inferSchema 或使用 cloudFiles.inferColumnTypes 自动加载程序来使用架构推理。
quote " 单个字符 当字段分隔符是值的一部分时用于对值进行转义的字符。
readerCaseSensitive true truefalse 指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。
rescuedDataColumn None 列名字符串 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参考什么是已恢复的数据列?
COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。
sepdelimiter , 一个字符串 列之间的分隔符字符串。
singleVariantColumn None 列名字符串 当设置为列名时,将整个 CSV 记录读入具有该名称的单个 VariantType 列,而不是将每个字段分析为其自己的列。 需要 header=true
skipRows 0 正整数或 0 应忽略的 CSV 文件开头的行数,包括注释行和空行。 如果 header 为 true,则标头将是第一个未跳过和未注释的行。
timeFormat HH:mm:ss 时间格式字符串 分析 TimeType 列值的格式。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 时间戳格式字符串 用于分析时间戳字符串的格式。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 时间戳格式字符串 不带时区 (TimestampNTZType) 字符串的时间戳分析格式。
timeZone None 字符串java.time.ZoneId 分析时间戳和日期时要使用的 java.time.ZoneId
unescapedQuoteHandling STOP_AT_DELIMITER STOP_AT_CLOSING_QUOTEBACK_TO_DELIMITERSTOP_AT_DELIMITERSKIP_VALUERAISE_ERROR 用于处理未转义的引号的策略。 每个允许选项的行为如下所示:
  • STOP_AT_CLOSING_QUOTE:如果在输入中找到了未转义引号,则会累积引号字符并继续将值解析为带引号值,直到找到右引号。
  • BACK_TO_DELIMITER:如果在输入中找到了未转义引号,则将该值视为无引号值。 这会让分析程序累积当前所分析值的所有字符,直至找到 sep 定义的分隔符。 如果在值中找不到分隔符,则分析程序会继续从输入中累积字符,直到找到分隔符或行尾。
  • STOP_AT_DELIMITER:如果在输入中找到了未转义引号,则将该值视为无引号值。 这会让分析程序累积所有字符,直至在输入中找到 sep 定义的分隔符或找到行尾。
  • SKIP_VALUE:如果在输入中发现了未转义的引号,则将跳过针对给定值所解析的内容(直至找到下一个分隔符),并将改为生成 nullValue 中设置的值。
  • RAISE_ERROR:如果在输入中找到未转义引号,将引发一个 TextParsingException

Excel

读取Excel文件时,以下选项适用。

密钥 默认 有效值 说明
dataAddress None 单元格区域或工作表名称字符串 要以 Excel 语法读取的单元格区域。 如果省略,则从第一个工作表读取所有有效单元格。 用于 SheetName!C5:H10 从命名工作表读取范围、 C5:H10 从第一个工作表读取区域或 SheetName 从特定工作表读取所有数据。
headerRows 0 01 用作列名标题的初始行数。 指定后 dataAddress ,这将在单元格范围内应用。 当0,列名称将自动生成为_c1_c2_c3等等。
ignoreMissingSheet false truefalse 是否以无提示方式跳过不包含指定 dataAddress工作表的文件。 如果 false文件缺少请求的工作表,则会引发错误。 仅在指定工作表名称 dataAddress时适用。
includePhoneticRuns false truefalse 在读取 XLSX 文件时,是否包含拼音注释(如 pinyin 或 furigana)连接到单元格字符串值。
operation readSheet readSheetlistSheets 对Excel工作簿执行的操作。 readSheet 从工作表中读取数据。 listSheets 返回一个包含字段 sheetIndex: longsheetName: String 每个工作表的结构。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 时间戳格式字符串 在Excel中存储为字符串的时间戳无时区值的自定义格式字符串。 自定义日期格式遵循 Datetime 模式中的格式。
dateFormat yyyy-MM-dd 日期格式字符串 读取为字符串 Date值的自定义格式字符串。 自定义日期格式遵循 Datetime 模式中的格式。

JSON

读取 JSON 文件时,以下选项适用。

密钥 默认 有效值 说明
allowBackslashEscapingAnyCharacter false truefalse 是否允许反斜杠对其后面的任何字符进行转义。 如果未启用,则只能对按 JSON 规范显式列出的字符进行转义。
allowComments false truefalse 是否允许在分析的内容中使用 Java、C 和 C++ 样式的注释('/''*''//' 变体)。
allowNonNumericNumbers true truefalse 是否允许将非数字 (NaN) 标记集用作合法浮点数字值。
allowNumericLeadingZeros false truefalse 是否允许整数以附加的(可忽略的)零开头(例如 000001)。
allowSingleQuotes true truefalse 是否允许使用单引号(撇号字符 '\')来引用字符串(名称和字符串值)。
allowUnquotedControlChars false truefalse 是否允许 JSON 字符串包含未转义的控制字符(值小于 32 的 ASCII 字符,包括制表符和换行符)。
allowUnquotedFieldNames false truefalse 是否允许使用 JavaScript 允许的未引用字段名称,但不允许由 JSON 规范使用。
alternateVariantEncoding None Z85 用于源 JSON 中 Variant 值的编码。 设置为 Z85 解码 Base85 编码的 Variant 值,而不是存储为内联 JSON。
badRecordsPath None 路径字符串 用于存储记录错误 JSON 信息的文件路径。
在基于文件的数据源中使用badRecordsPath选项有以下限制:
  • 这是非事务性的,可能会导致结果不一致。
  • 暂时性错误被视为故障。
columnNameOfCorruptRecord _corrupt_record 列名字符串 用于存储因格式不正确而无法分析的记录的列。 如果用于分析的 mode 设置为 DROPMALFORMED,则此列将为空。
dateFormat yyyy-MM-dd 日期格式字符串 用于分析日期字符串的格式。
dropFieldIfAllNull false truefalse 在进行架构推理期间是否忽略所有 null 值或空数组和结构的列。
encodingcharset UTF-8 名称java.nio.charset.Charset JSON 文件编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset。 当 UTF-16UTF-32 时,不能使用 multilinetrue
inferTimestamp false truefalse 是否尝试将时间戳字符串推理为 TimestampType。 设置为 true架构推理时,架构推理可能需要明显更长的时间。 必须启用 cloudFiles.inferColumnTypes 才能与自动加载程序一起使用。
lineSep 无,涵盖 \r\r\n\n 一个字符串 两个连续 JSON 记录之间的字符串。
locale US 标识符java.util.Locale 影响 JSON 中默认日期、时间戳和十进制分析的Java区域设置标识符。
maxNestingDepth 500 正整数 JSON 对象和数组允许的最大嵌套深度。 为深度嵌套文档增加此值。
maxNumLen 1000 正整数 JSON 输入中数字令牌的最大长度。 为包含大数值文本的 JSON 增加此值。
maxStringLen 不限制 正整数 JSON 输入中字符串值的最大长度。 设置为使用大型字符串分析 JSON 时限制内存使用量。
mode PERMISSIVE PERMISSIVEDROPMALFORMEDFAILFAST 围绕处理格式错误的记录提供的分析程序模式。
multiLine false truefalse JSON 记录是否跨多行。
prefersDecimal false truefalse 如果可能,尝试将字符串推断为 DecimalType 而不是浮点型或双精度型。 此外,还必须通过启用 inferSchema 或使用 cloudFiles.inferColumnTypes 自动加载程序来使用架构推理。
primitivesAsString false truefalse 是否将数字和布尔值等基元类型推理为 StringType
readerCaseSensitive true truefalse 指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 在 Databricks Runtime 13.3 及更高版本中可用。
rescuedDataColumn None 列名字符串 是否将因数据类型不匹配或架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参阅什么是已恢复的数据列?
COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。
singleVariantColumn None 列名字符串 是否引入整个 JSON 文档,分析为具有指定字符串的单个 Variant 列作为列的名称。 如果未设置,JSON 字段将引入其自己的列。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 时间戳格式字符串 用于分析时间戳字符串的格式。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 时间戳格式字符串 不带时区 (TimestampNTZType) 字符串的时间戳分析格式。
timeZone None 字符串java.time.ZoneId 分析时间戳和日期时要使用的 java.time.ZoneId
upgradeExceptionAsBadRecord false truefalse 是否将类型升级异常(例如,当值不能扩大为声明的列类型时)视为错误的记录,而不是引发异常。

Kafka

有关 Kafka 读取器选项的完整列表,请参阅 DataStreamReader Kafka 选项。 以下选项仅适用于使用 spark.read.format("kafka").

密钥 默认 有效值 说明
endingOffsets latest latest或 JSON 偏移字符串 停止阅读的位置。 在 JSON 字符串中, -1 是最新的偏移量。 -2作为最早的偏移量,不允许作为结束偏移量。 这是一个 JSON 偏移字符串示例: {"topicA":{"0":50,"1":-1}}
endingOffsetsByTimestamp None JSON 时间戳字符串 以毫秒为单位指定为时间戳的按分区结束偏移量。 例如: {"topicA":{"0":2000,"1":3000}}
endingTimestamp None 正整数或 0 应用于所有分区的全局结束时间戳(以毫秒为单位)。

ORC

读取 ORC 文件时,以下选项适用。

密钥 默认 有效值 说明
mergeSchema false truefalse 是否在多个文件中推断模式并合并每个文件的模式。

Parquet

读取 Parquet 文件时,以下选项适用。

密钥 默认 有效值 说明
datetimeRebaseMode LEGACY EXCEPTIONLEGACYCORRECTED 控制 DATE 和 TIMESTAMP 值在儒略历与外推格里历之间的基本值重定。
int96RebaseMode LEGACY EXCEPTIONLEGACYCORRECTED 控制 INT96 时间戳值在儒略历与外推格里历之间的基本值重定。
mergeSchema false truefalse 是否在多个文件中推断模式并合并每个文件的模式。
readerCaseSensitive true truefalse 指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。
rescuedDataColumn None 列名字符串 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参考什么是已恢复的数据列?
COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。

状态存储

将这些选项用于 spark.read.format("statestore")read_statestore 表值函数来读取结构化流式处理状态数据。 请参阅读取结构化流式处理状态信息

密钥 默认 有效值 说明
batchId 最新批处理 ID 正整数或 0 要从中读取的目标批处理。 用于查询查询的早期状态。 该批必须提交,但尚未清理。
operatorId 0 正整数或 0 要从中读取的目标运算符。 当查询具有多个有状态运算符时使用。
storeName DEFAULT 任意字符串 要从中读取的目标状态存储名称。 当有状态运算符具有多个状态存储实例时使用。 必须指定流联接或storeNamejoinSide同时指定流联接,但不能同时指定两者。
joinSide None leftright 要从中读取流联接的目标端。 必须指定流联接或storeNamejoinSide同时指定流联接,但不能同时指定两者。
snapshotStartBatchId None 正整数或 0 在读取状态时用作起始点的快照的批处理 ID。 读取器通过重播此快照的更改重新生成状态,直到 batchId此快照。 当快照损坏时非常有用。 必须一 snapshotPartitionId起指定 。 不能与 .一起使用 readChangeFeed。 支持启用了更改日志检查点的 HDFS 支持的状态存储和 RocksDB 状态存储。 在 Databricks Runtime 15.4 LTS 及更高版本中可用。
snapshotPartitionId None 正整数或 0 如果指定,查询将仅读取此分区。 必须一 snapshotStartBatchId起指定 。 不能与 .一起使用 readChangeFeed。 在 Databricks Runtime 15.4 LTS 及更高版本中可用。
readChangeFeed false truefalse true,返回指定范围之间的批和changeStartBatchIdchangeEndBatchId之间的状态更改。 需要 changeStartBatchId。 不能与 joinSidebatchIdsnapshotStartBatchIdsnapshotPartitionId. 在 Databricks Runtime 16.4 LTS 及更高版本中可用。
有关详细信息,请参阅 “读取结构化流式处理”状态更改
changeStartBatchId None 正整数或 0 更改源范围的起始批 ID。 当 readChangeFeedtrue 时为必需项。 仅当 readChangeFeed 设置为 true.. 在 Databricks Runtime 16.4 LTS 及更高版本中可用。
changeEndBatchId 最新批处理 ID 正整数或 0 更改源范围的结束批 ID。 必须大于或等于 changeStartBatchId。 仅当 readChangeFeed 设置为 true.. 在 Databricks Runtime 16.4 LTS 及更高版本中可用。
stateVarName None 任意字符串 要读取的状态变量名称。 状态变量名称是运算符使用的init函数中StatefulProcessor每个变量的唯一transformWithState名称。 使用 transformWithState 运算符时是必需的。 在 Databricks Runtime 16.4 LTS 及更高版本中可用。
readRegisteredTimers false truefalse true读取运算符使用的 transformWithState 已注册计时器时。 transformWithState仅适用于运算符。 在 Databricks Runtime 16.4 LTS 及更高版本中可用。
flattenCollectionTypes true truefalse true,平展为映射和列表状态变量返回的记录。 当 false,以 Spark SQL ArrayMap. transformWithState仅适用于运算符。 在 Databricks Runtime 16.4 LTS 及更高版本中可用。

文本

读取文本文件时,以下选项适用。

密钥 默认 有效值 说明
encoding UTF-8 名称java.nio.charset.Charset TEXT 文件行分隔符的编码的名称。 文件的内容不受此选项的影响,仍会读取as-is。
lineSep 无,涵盖\r\r\n\n 一个字符串 两个连续 TEXT 记录之间的字符串。
wholeText false truefalse 是否将文件读取为单个记录。

XML

读取 XML 文件时,以下选项适用。

密钥 默认 有效值 说明
rowTag None 任意字符串 要作为行处理的 XML 文件的行标记。 在示例 XML <book> <page><page>...<book> 中,相应的值为 page。 这是必需选项。
samplingRatio 1.0 0.01.0 定义用于架构推理的行的一部分。 XML 内置函数会忽略此选项。
excludeAttribute false truefalse 是否排除元素中的属性。
mode None PERMISSIVEDROPMALFORMEDFAILFAST 在解析过程中处理损坏的记录的一种模式。
  • PERMISSIVE:对于损坏的记录,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式错误的字段设置为 null。 若要保留损坏的记录,可以在用户定义的架构中设置名为 stringcolumnNameOfCorruptRecord 类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 推理架构时,分析程序会在输出架构中隐式添加 columnNameOfCorruptRecord 字段。
  • DROPMALFORMED:忽略损坏的记录。 XML 内置函数不支持此模式。
  • FAILFAST:分析程序遇到损坏的记录时引发异常。
inferSchema true truefalse 如果为 true,则尝试推断每个生成的 DataFrame 列的相应类型。 如果为 false,则生成的所有列均为 string 类型。 XML 内置函数会忽略此选项。
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord 列名字符串 允许重命名包含模式 PERMISSIVE 创建的格式不正确的字符串的新字段。
attributePrefix None 任意字符串 属性的前缀,用于区分属性和元素。 这将是字段名称的前缀。 默认值为 _。 读取 XML 时可以为空,但写入时不能为空。 也适用于 DataFrameWriter XML 选项
valueTag _VALUE 任意字符串 该标记用于同时具有属性元素或子元素的元素中的字符数据。 用户可以在架构中指定 valueTag 字段,或者当字符数据存在于具有其他元素或属性的元素中时,该字段将在架构推断期间自动添加。 也适用于 DataFrameWriter XML 选项
encoding UTF-8 名称java.nio.charset.Charset 若要读取,请根据给定的编码类型解码 XML 文件。 对于写入,请指定已保存的 XML 文件的编码(字符集)。 XML 内置函数会忽略此选项。 也适用于 DataFrameWriter XML 选项
ignoreSurroundingSpaces true truefalse 是否必须跳过围绕值的空格。 将忽略只有空格的字符数据。
rowValidationXSDPath None 文件路径字符串 可选 XSD 文件的路径,用于单独验证每行的 XML。 未能验证的行被视为分析错误。 XSD 不会影响架构,无论是指定的还是推断的。
ignoreNamespace false truefalse 如果为 true,则忽略 XML 元素和属性上的命名空间前缀。 例如,标记 <abc:author><def:author> 会被视为只是 <author>。 不能忽略 rowTag 元素上的命名空间,只忽略其读取子元素。 XML 解析不识别命名空间,即使在有 false 的情况下。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 时间戳格式字符串 遵循 日期时间模式 格式的自定义时间戳格式字符串。 它适用于 timestamp 类型。 也适用于 DataFrameWriter XML 选项
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 时间戳格式字符串 遵循日期/时间模式格式的不带时区的自定义时间戳格式字符串。 这适用于 TimestampNTZType 类型。 也适用于 DataFrameWriter XML 选项
dateFormat yyyy-MM-dd 日期格式字符串 遵循 日期时间模式 格式的自定义日期格式字符串。 这适用于日期类型。 也适用于 DataFrameWriter XML 选项
locale en-US IETF BCP 47 语言标记 将地区设置为 IETF BCP 47 格式的语言标签。 例如,在分析日期和时间戳时使用 locale
nullValue 字符串 null 任意字符串 设置 null 值的字符串表示形式。 当为 null 时,分析程序不会为字段写入属性和元素。 也适用于 DataFrameWriter XML 选项
readerCaseSensitive true truefalse 指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。
rescuedDataColumn None 列名字符串 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法解析的所有数据收集到单独的列中。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参阅什么是被恢复的数据列? COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。
singleVariantColumn none 列名字符串 指定单个变体列的名称。 如果指定此选项用于读取,将整个 XML 记录解析为一个单个 Variant 列,并将该选项字符串值作为列名称。 如果为写入指定此选项,将单个 Variant 列的值写入 XML 文件。 也适用于 DataFrameWriter XML 选项
useLegacyXMLParser true truefalse 是否使用旧版 XML 分析程序。 旧版分析程序对格式不正确的内容进行了不太严格的验证,但内存效率较低。 false设置为选择使用更严格的默认分析程序。
wildcardColName xs_any 列名字符串 用于捕获与通配符 (xs:any) 架构元素匹配的 XML 元素的列名。 不能与 rescuedDataColumn.一起使用。

DataStreamReader 选项

使用这些选项 DataStreamReader.option() 配置 Delta Lake 表和其他基于文件的源的流式读取。

有关文件格式选项(JSON、CSV、Parquet 等),请参阅 DataFrameReader 选项

有关自动加载程序(cloudFiles.*)选项,请参阅 自动加载程序

Example

以下示例设置为 maxFilesPerTrigger10 Delta Lake 表流:

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")

Common

以下选项适用于 Delta Lake 表和其他基于文件的流式处理源。

密钥 默认 有效值 说明
cleanSource off offdeletearchive 如何在流处理源文件后处理源文件。 off 不执行任何操作。 delete 永久删除源文件。 archive 将文件移动到 sourceArchiveDirarchive设置为时,sourceArchiveDir还必须设置。 不适用于 Delta Lake 表流式处理。
fileNameOnly false truefalse 是否仅按文件名而不是完整路径标识已处理的文件。 当 true,具有相同文件名的不同路径的文件将被视为同一文件,并且不会重新处理。 不适用于 Delta Lake 表流式处理。
latestFirst false truefalse 是否在每个微批处理中首先处理最近修改的文件。 在希望尽快处理最新数据时非常有用。 当和truemaxFilesPerTrigger已设置时maxBytesPerTriggermaxFileAge将忽略。 不适用于 Delta Lake 表流式处理。
maxBytesPerTrigger None 正整数 每个微批处理的数据量的软最大值。 如果最小输入单位超过限制,批处理可能会处理超过限制。 一起使用 maxFilesPerTrigger时,微批处理会处理数据,直到达到任何一个限制。
对于自动加载程序,请改用 cloudFiles.maxBytesPerTrigger。 请参阅 “常见”。
maxCachedFiles 10000 正整数或 0 要缓存后续微批处理的未处理文件的最大数量。 设置为 0 关闭缓存。 当源目录包含每个触发器的许多新文件时增加此值。 不适用于 Delta Lake 表流式处理。
maxFileAge 7d 持续时间字符串,例如 7d4h 考虑处理的文件的最大期限,相对于最近修改的文件的时间戳,而不是当前系统时间。 忽略超过此阈值的文件。 在设置latestFirsttrue设置maxFilesPerTriggermaxBytesPerTrigger被忽略。 不适用于 Delta Lake 表流式处理。
maxFilesPerTrigger 1000 用于 Delta Lake 和自动加载程序。 对于其他基于文件的源,没有最大值。 正整数 每个微批处理中处理的新文件数上限。 一起使用 maxBytesPerTrigger时,微批处理会处理数据,直到达到任何一个限制。
对于自动加载程序,请改用 cloudFiles.maxFilesPerTrigger。 请参阅 “常见”。
sourceArchiveDir None 路径字符串 设置为 cleanSourcearchive存档目录的路径。 源文件在处理后移动到此路径,保留其相对目录结构。 不适用于 Delta Lake 表流式处理。

自动加载器

将这些选项与源配合使用 cloudFiles ,为从云存储进行流式引入配置 自动加载程序 。 特定于 cloudFiles 源的选项带有 cloudFiles 前缀,以将它们保留在与其他 结构化流 源选项不同的命名空间中。

Common

以下选项适用于所有自动加载程序配置。

密钥 默认 有效值 说明
cloudFiles.allowOverwrites false truefalse 是否允许输入目录文件更改替代现有数据。
有关配置注意事项,请参阅自动加载程序在追加或覆盖文件时是否再次处理文件?
cloudFiles.backfillInterval None 持续时间字符串,例如 1 day1 week 自动加载程序可以按给定间隔触发异步回填。 有关详细信息,请参阅 使用 cloudFiles.backfillInterval 触发常规回填
请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。
cloudFiles.cleanSource OFF OFFDELETEMOVE 是自动删除还是从输入目录中移动已处理的文件。 设置为 OFF (默认值)时,不会删除任何文件。
设置为 DELETE时,自动加载程序会在处理文件 30 天后自动删除文件。 为此,自动加载程序必须具有对源目录的写入权限。
设置为 MOVE 后,自动加载程序将在文件被 cloudFiles.cleanSource.moveDestination 处理后 30 天内自动移动到指定位置。 为此,自动加载程序必须对源目录以及移动位置具有写入权限。
当文件在表值函数的结果commit_time中具有非 null 值cloud_files_state时,会将其视为已处理。 请参阅 cloud_files_state 表值函数。 可以使用 cloudFiles.cleanSource.retentionDuration配置处理后的 30 天额外等待时间。
启用 cloudFiles.cleanSource之前,请查看以下注意事项:
  • 如果源位置有多个流使用数据,则Azure Databricks不建议使用此选项,因为最快的使用者将删除文件,并且它们不会引入较慢的源中。
  • 启用此功能需要自动加载程序在其检查点中维护其他状态,这会产生性能开销,但可以通过 cloud_files_state 表值函数提高可观测性。 请参阅 cloud_files_state 表值函数
  • cleanSource使用当前设置来确定是还是MOVEDELETE给定文件。 例如,假设文件最初处理时的设置是MOVE,但在30天后文件成为清理候选时已更改为DELETE。 在这种情况下,cleanSource 将删除该文件。
  • 一旦 retentionDuration 过期,不保证立即清理文件。 为了降低成本,自动加载程序会随流处理同时删除文件,并在流处理完成或终止后立即终止。 在下次运行自动加载程序时,将选取适合清理但无法在流处理期间清理的文件。

在 Databricks Runtime 16.4 及更高版本中可用。
cloudFiles.cleanSource.retentionDuration 30 days CalendarInterval 字符串,例如14 days2 weeks1 month 在已处理文件成为通过 cleanSource 进行存档的候选文件之前需等待的时间。 对于 DELETE 操作来说必须长于 7 天。 对于 MOVE 操作来说没有最低限制。
在 Databricks Runtime 16.4 及更高版本中可用。
cloudFiles.cleanSource.moveDestination None 云存储或 Unity 目录卷路径 cloudFiles.cleanSource 设置为 MOVE 时已处理文件的存档路径。 这可以是云存储路径或 Unity 目录卷 路径(例如 /Volumes/my_catalog/my_schema/my_volume/archive/)。
移动位置必须:
  • 不是源目录的子级。 如果将移动目标放置在源目录中,则会再次引入存档的文件。
  • 与源位于同一外部位置、卷或 DBFS 装载中。 不支持在不同存储桶和存储容器之间移动文件,将导致错误。

自动加载程序必须对此目录具有写入权限。
在 Databricks Runtime 16.4 及更高版本中可用。
cloudFiles.format 无(必需选项) avrobinaryFilecsvjsonorcparquettextxml 源路径中的数据文件格式。 有效值包括:
cloudFiles.includeExistingFiles true truefalse 是包含流式处理输入路径中的现有文件,还是仅处理初始设置后到达的新文件。 仅在您首次启动流时会对该选项进行评估。 在重启流后更改此选项不起作用。
cloudFiles.inferColumnTypes false truefalse 在利用架构推理时是否推断确切的列类型。 默认情况下,在推断 JSON 和 CSV 数据集时,列将被推断为字符串。 有关更多详细信息,请参阅架构推理
cloudFiles.maxBytesPerTrigger None 字节字符串,例如 10g 每次触发时要处理的最大新字节数。 这是一个软性最大值。 如果每个文件为 3 GB,则 Azure Databricks 在一个微批中可以处理 12 GB。 与 cloudFiles.maxFilesPerTrigger 一起使用时,Azure Databricks 会消耗到 cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTrigger 中的最低限制(以先到达者为准)。 与 Trigger.Once()Trigger.Once() 已弃用)一起使用时,此选项不起作用。
在 Databricks Runtime 18.0 及更高版本中,此选项是动态配置的,不需要手动设置。
cloudFiles.maxFileAge None 持续时间字符串 出于重复数据删除目的而跟踪文件事件的时长。 Databricks 建议不要优化此参数,除非你是在以每小时数百万个文件的速度引入数据。 有关更多详细信息,请参阅 文件事件跟踪 部分。
过于激进地调整 cloudFiles.maxFileAge 可能会导致数据质量问题,例如重复引入或缺少文件。 因此,Databricks 建议为 cloudFiles.maxFileAge 使用保守设置,例如 90 天,这与类似数据引入解决方案建议的值相当。
cloudFiles.maxFilesPerTrigger 1000 正整数 要在每个触发器中处理的最大新文件数。 与 cloudFiles.maxBytesPerTrigger 一起使用时,Azure Databricks 会消耗到 cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTrigger 中的最低限制(以先到达者为准)。 与 Trigger.Once()(已弃用)一起使用时,此选项不起作用。
在 Databricks Runtime 18.0 及更高版本中,此选项是动态配置的,不需要手动设置。
cloudFiles.partitionColumns None 以逗号分隔的列名称列表 要从文件的目录结构推断的 Hive 样式分区列的逗号分隔列表。 Hive 样式的分区列是由等号(例如 <base-path>/a=x/b=1/c=y/file.format)组合的键值对。 在此示例中,分区列为 abc。 如果使用架构推理并指定 <base-path> 从中加载数据,则这些列会自动添加到架构中。 如果指定架构,自动加载程序要求这些列包含在架构中。 如果你不希望这些列成为架构的一部分,则可以指定 "" 以忽略这些列。 此外,当你希望在复杂目录结构中通过文件路径推断列时,可以使用此选项,如下面的示例所示:
<base-path>/year=2022/week=1/file1.csv
<base-path>/year=2022/month=2/day=3/file2.csv
<base-path>/year=2022/month=2/day=4/file3.csv
cloudFiles.partitionColumns 指定为 year,month,day 会针对 year=2022 返回 file1.csv,但 monthday 列为 null
monthday 已正确解析为 file2.csvfile3.csv
cloudFiles.schemaEvolutionMode addNewColumns 如果未指定架构, none 则为 addNewColumnsnonerescuefailOnNewColumns 在数据中发现新列时对架构进行演变的模式。 默认情况下,在推断 JSON 数据集时,将列推断为字符串。 有关更多详细信息,请参阅架构演变
cloudFiles.schemaHints None 架构字符串 在架构推理过程中指定给自动加载程序的架构信息。 有关更多详细信息,请参阅架构提示
cloudFiles.schemaLocation 无(推断架构所需的) 路径字符串 存储推断出的架构和后续更改的位置。 有关更多详细信息,请参阅架构推理
cloudFiles.useStrictGlobber false truefalse 是否使用与 Apache Spark 中其他文件源的默认通配行为相匹配的严格通配符。 有关更多详细信息,请参阅常见数据加载模式。 在 Databricks Runtime 12.2 LTS 及更高版本中可用。
cloudFiles.validateOptions true truefalse 是否对自动加载器选项进行验证,并对未知或不一致的选项返回错误。

目录列表

使用目录列表模式时,适用以下选项。

密钥 默认 有效值 说明
cloudFiles.useIncrementalListing(已弃用) auto 在 Databricks Runtime 17.2 及更低版本中, false 在 Databricks Runtime 17.3 及更高版本上 autotruefalse 此功能已弃用。 Databricks 建议将 文件通知模式用于文件事件 而不是使用 cloudFiles.useIncrementalListing
是否在目录列表模式下使用增量列表而不是完整列表。 默认情况下,自动加载程序尽最大努力自动检测给定目录是否适用于增量列表。 可以显式使用增量列表,或者通过将完整目录列表分别设置为 truefalse 来使用该列表。
错误地在非按词汇排序的目录上启用增量列表会阻止自动加载程序发现新文件。
适用于 Azure Data Lake Storage(abfss://)、S3(s3://)和 GCS(gs://)。
在 Databricks Runtime 9.1 LTS 及更高版本中可用。

文件通知

有关配置文件通知模式(包括所需的云权限、设置说明和身份验证方法)的信息,请参阅 在文件通知模式下配置自动加载程序流

密钥 默认 有效值 说明
cloudFiles.fetchParallelism 1 正整数 从队列服务中提取消息时要使用的线程数。
请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。
cloudFiles.pathRewrites None JSON 映射字符串 仅当指定接收 queueUrl 来自多个 S3 存储桶的文件通知并且想要使用配置为访问这些容器中的数据的装入点时,才是必需的。 借助此选项可以使用装入点重写 bucket/key 路径的前缀。 只能重写前缀。 例如,对于配置 {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"},路径 s3://<databricks-mounted-bucket>/path/2017/08/fileA.json 将重写为 dbfs:/mnt/data-warehouse/2017/08/fileA.json
请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。
cloudFiles.resourceTag None 键值标记字符串 一系列键值标记对,可帮助关联和识别相关资源,例如:
cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")
请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 而是使用云提供程序控制台设置资源标记。
有关详细信息,请参阅 云提供程序资源标记
cloudFiles.useManagedFileEvents false truefalse 设置为 true时,Auto Loader 使用文件事件服务来检测外部位置中的文件。 仅当加载路径位于启用了文件事件的外部位置时,才能使用此选项。 请参阅 将文件通知模式与文件事件配合使用
文件事件在文件发现中提供通知级性能,因为自动加载程序可以在上次运行后发现新文件。 与目录列表不同,此过程不需要列出目录中的所有文件。
在某些情况下,即使启用了文件事件选项,自动加载程序也使用目录列表:
  • 在初始加载期间,当 includeExistingFiles 设置为 true 时,会进行一个完整的目录列表,以发现自动加载程序启动之前目录中存在的所有文件。
  • 文件事件服务通过缓存最近创建的文件来优化文件发现。 如果自动加载程序不经常运行,则此缓存可能会过期,自动加载程序会回退到目录列表以发现文件和更新缓存。 为了避免这种情况,请至少每七天调用一次自动加载程序。
在 Databricks Runtime 14.3 LTS 及更高版本中可用。
cloudFiles.listOnStart false truefalse 设置为 /> 时,自动加载程序会在流启动时执行完整目录列表,而不是从检查点中的继续标记开始。 使用此选项可从错误(如 . ) CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN中恢复。 请参阅如何从CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN错误中恢复?
cloudFiles.useNotifications false truefalse 是否使用文件通知模式来确定何时存在新文件。 如果为 false,则使用目录列表模式。 请参阅比较自动加载器文件检测模式
请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。

云提供商资源标记

默认情况下,自动加载程序会尽力添加以下键值标记对:

  • vendor: Databricks
  • path:从中加载数据的位置。 由于标签限制,在 GCP 中不可用。
  • checkpointLocation:流检查点的位置。 由于标签限制,在 GCP 中不可用。
  • streamId:流的全局唯一标识符。

Databricks 保留这些密钥名称,并且无法覆盖其值。

有关 Azure 的详细信息,请参阅命名队列和元数据以及properties.labels中的覆盖范围。 自动加载程序将这些键值标记对以 JSON 格式存储为标签。

特定于云

自动加载程序提供了用于为文件通知模式配置云基础结构的选项。 有关所需的云权限和设置说明,请参阅 在文件通知模式下配置自动加载程序流

蔚蓝

如果指定 cloudFiles.useNotifications = true 并且希望自动加载程序为你设置通知服务,则必须为以下所有选项指定值:

密钥 默认 有效值 说明
cloudFiles.resourceGroup None 任意字符串 在其中创建存储帐户的Azure资源组。
cloudFiles.subscriptionId None 任意字符串 在其中创建资源组的Azure订阅 ID。
databricks.serviceCredential None 任意字符串 您的 Databricks 服务凭据的名称。 在 Databricks Runtime 16.1 及更高版本中可用。

如果 Databricks 服务凭据不可用,可以改为指定以下身份验证选项:

密钥 默认 有效值 说明
cloudFiles.clientId None 任意字符串 服务主体的客户端 ID 或应用程序 ID。
cloudFiles.clientSecret None 任意字符串 服务主体的客户端密钥。
cloudFiles.connectionString None 一个连接字符串 存储帐户的连接字符串,基于帐户访问密钥或共享访问签名 (SAS)。
cloudFiles.tenantId None 任意字符串 在其中创建服务主体的Azure租户 ID。

仅当设置 cloudFiles.useNotifications = true 并且希望自动加载程序使用现有队列时,才指定以下选项:

密钥 默认 有效值 说明
cloudFiles.queueName None 任意字符串 Azure 队列的名称。 如果指定,云文件源将直接使用此队列中的事件,而不是设置自己的Azure 事件网格和队列存储服务。 在这种情况下,databricks.serviceCredentialcloudFiles.connectionString 只需要对队列具有读取权限。

Delta Lake

使用以下 spark.readStream选项从 Delta Lake 表读取时适用。

密钥 默认 有效值 说明
allowSourceColumnDrop None 版本号或 always 设置为 Delta 表版本号或 always 允许流在从源表架构中删除列后继续。 当设置为版本号时,确认所有架构更改都达到该版本。 需要 schemaTrackingLocation。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明
allowSourceColumnRename None 版本号或 always 设置为 Delta 表版本号或 always 允许流在源表中重命名列后继续。 当设置为版本号时,确认所有架构更改都达到该版本。 需要 schemaTrackingLocation。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明
allowSourceColumnTypeChange None 版本号或 always 设置为 Delta 表版本号或 always 允许流在源表中更改列类型后继续。 当设置为版本号时,确认所有架构更改都达到该版本。 需要 schemaTrackingLocation。 请参阅类型扩展
excludeRegex None Java正则表达式字符串 正则表达式模式。 与模式匹配的文件从流读取中排除。 用于筛选不符合预期命名约定的文件。
failOnDataLoss true truefalse 如果源数据由于日志保留()而删除了流式处理查询,logRetentionDuration是否失败。 设置为 false 跳过缺失的数据并继续处理。 请参阅为“按时间顺序查看”查询配置数据保留
ignoreChanges(已弃用) false truefalse 在 Databricks Runtime 11.3 LTS 和更低版本中可用。 在修改操作(如UPDATEMERGE INTODELETEOVERWRITE)后重新发出重写数据文件。 可以与新行一起发出未更改的行,因此下游使用者必须处理重复项。 删除操作不会传播到下游。 替换为 skipChangeCommits Databricks Runtime 12.2 LTS 及更高版本。
ignoreDeletes(已弃用) false truefalse 忽略在分区边界处删除数据的事务(仅删除完整分区)。 不处理非分区删除、更新或其他修改。 请改用 skipChangeCommits
readChangeFeedreadChangeData false truefalse 是否启用读取流式处理查询的更改数据馈送。 启用后,流会发出包含其他元数据列的行级更改(插入、更新和删除)。 请参阅在 Azure Databricks 中使用更改数据馈送
schemaTrackingLocation None 路径字符串 Delta Lake 跟踪流读取的架构更改的目录的路径。 当从启用了列映射的表进行流式处理以及使用 allowSourceColumn* 选项处理架构演变时是必需的。 必须位于 checkpointLocation 流式处理查询中。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明
skipChangeCommits false truefalse 忽略删除或修改现有记录和进程仅追加的事务。 Databricks 建议对不使用更改数据馈送的大多数工作负荷使用此选项。 在 Databricks Runtime 12.2 LTS 及更高版本中可用。 请参阅 使用 跳过上游更改提交 skipChangeCommits
startingTimestamp 最新可用 时间戳字符串,例如 2019-01-01T00:00:00.000Z 或日期字符串,例如 2019-01-01 要从中读取的时间戳。 该流读取指定时间戳或之后提交的所有表更改。 如果时间戳位于所有可用表提交之前,则流从最早的可用提交开始。 不能与 startingVersion.一起使用。 如果流式处理检查点已存在,则忽略。
startingVersion 最新可用 正整数, 0latest 要从中读取的增量表版本。 该流读取指定版本或之后提交的所有更改。 指定 latest 仅从最近的更改开始。 不能与 startingTimestamp.一起使用。 如果流式处理检查点已存在,则忽略。 参见 使用表历史记录
withEventTimeOrder false truefalse 将初始表快照划分为事件时间桶,以防止记录被错误地标记为后期事件,并在带有水印的有状态查询中丢弃。 在开始初始快照处理后无法更改,而无需删除检查点。 在 Databricks Runtime 11.3 LTS 及更高版本中可用。 请参阅 “处理初始快照而不删除数据”。

Kafka

将这些选项与以下选项一起使用: spark.readStream.format("kafka")spark.read.format("kafka")

密钥 默认 有效值 说明
assign None JSON 字符串,例如 {"topicA":[0,1],"topicB":[2,4]} 要使用的特定分区。 必须准确指定其中一个subscribesubscribePatternassign选项。
failOnDataLoss true truefalse 如果数据可能由于已删除主题或偏移截断而丢失,则查询是否失败。 设置为 false 跳过缺失的数据并继续。
Databricks 保守地估计数据是否丢失。 但是,这可能会导致误报。
fetchoffset.numretries 3 正整数或 0 提取 Kafka 偏移量失败时重试次数。
fetchoffset.retryintervalms 1000 正整数或 0 偏移提取重试之间的间隔(以毫秒为单位)。
groupIdPrefix spark-kafka-source (流式处理), spark-kafka-relation (批处理) 任意字符串 用于自动生成的 Kafka 使用者组 ID 的自定义前缀。 如果 kafka.group.id 显式设置,连接器将忽略此选项。
kafka.group.id None 任意字符串 读取时要使用的 Kafka 使用者组 ID。 请谨慎使用:共享同一组 ID 的查询相互干扰,并且可能只读取部分数据。 当运行并发批处理和流式处理工作负荷或快速重启查询时,可能会发生这种情况。 如果设置,则会忽略 groupIdPrefix。 若要最大程度地减少问题,请将 Kafka 使用者配置 session.timeout.ms 设置为较小的值。
includeHeaders false truefalse 是否将 Kafka 消息标头作为列包含在输出中。
kafkaconsumer.polltimeoutms None 正整数 Kafka 使用者 poll() 调用的超时(以毫秒为单位)。
kafka.bootstrap.servers None 字符串的 host:port 逗号分隔列表 Kafka 中转站的 host:port 地址的逗号分隔列表。 设置 Kafka 客户端 bootstrap.servers 的属性。
如果发现 Kafka 中没有数据,请检查此中转站地址列表以获取不正确的地址。 如果中转站地址列表不正确,则可能没有任何错误。 Kafka 客户端假设代理最终可用,并在收到网络错误时重试。
maxRecordsPerPartition None 正整数 每个 Spark 分区的最大记录数。 设置后,连接器会拆分 Kafka 分区,以便每个 Spark 分区最多读取这多条记录。
还可以将此选项与 minPartitions.. 当这两个选项都设置时,Spark 使用哪个选项会导致更多分区。
minPartitions None 正整数 要从 Kafka 读取的 Spark 分区的最小数目。 设置后,连接器会拆分大型 Kafka 分区以提高并行度。 如果未设置,Spark 会为每个 Kafka 主题分区创建一个分区。 用于处理数据倾斜或峰值负载。
此选项重新初始化每个触发器的 Kafka 使用者,这可能会影响 SSL 的性能。
startingOffsets latest (流式处理), earliest (批处理) earliestlatest或 JSON 偏移字符串 查询开始读取的偏移量。 在 JSON 字符串中, -1 是最新的偏移量。 -2 是最早的偏移量。 例如: {"topicA":{"0":23,"1":-2}}
对于流式处理查询,此选项仅适用于新查询启动时。 恢复的查询始终使用检查点。 在查询期间,新分区开始读取最早的偏移量。
对于批处理查询, latest 不允许使用。
startingOffsetsByTimestamp None JSON 时间戳字符串,例如 {"topicA":{"0":1000,"1":2000}} 每个分区的起始偏移量列表,指定为时间戳(以毫秒为单位)。 如果时间戳不存在偏移量,查询行为由 .startingOffsetsByTimestampStrategy
对于流式处理查询,此选项仅适用于新查询启动时。 恢复的查询始终使用检查点。 在查询期间,新分区开始读取最早的偏移量。
startingOffsetsByTimestampStrategy error errorlatest 为指定startingOffsetsByTimestampstartingTimestamp时间戳找到偏移量时要使用的策略。 error 引发异常。 latest 使用最新的可用偏移量。
startingTimestamp None 正整数或 0 应用于所有分区的全局起始时间戳(以毫秒为单位)。 如果时间戳不存在偏移量,则行为由 .startingOffsetsByTimestampStrategy
subscribe None 以逗号分隔的主题名称列表 要订阅的主题。 必须准确指定其中一个subscribesubscribePatternassign选项。
subscribePattern None Java正则表达式字符串 用于订阅主题的模式。 必须准确指定其中一个subscribesubscribePatternassign选项。 例如,topic.*

以下选项仅适用于流式读取:spark.readStream.format("kafka")

密钥 默认 有效值 说明
bytesEstimateWindowLength 300s 持续时间字符串,例如 10m600s 用于估算指标剩余字节 estimatedTotalBytesBehindLatest 的时间范围。 请参阅检索 Kafka 指标
maxOffsetsPerTrigger None 正整数 每个触发器间隔处理的最大偏移量。 偏移量按比例分布于主题分区。
maxTriggerDelay 15m 持续时间字符串,例如 10m600s 在触发之前等待 minOffsetsPerTrigger 累积的最长时间。
minOffsetsPerTrigger None 正整数 触发微批处理之前要累积的最小偏移量。 达到此时间 maxTriggerDelay 后,无论何时运行微批处理。

有关仅适用于批处理读取的 spark.read.format("kafka")偏移选项,请参阅 DataFrameReader Kafka 选项

身份验证

Databricks 建议使用 Unity 目录服务凭据向云托管的 Kafka 服务(AWS MSK、Azure 事件中心 或 Google Cloud Managed Kafka)进行身份验证。

密钥 默认 有效值 说明
databricks.serviceCredential None 任意字符串 用于向云管理的 Kafka 服务进行身份验证的 Unity 目录 服务凭据 的名称。 在 Databricks Runtime 16.1 及更高版本中可用。
databricks.serviceCredential.scope None 任意字符串 服务凭据的 OAuth 范围。 仅当Azure Databricks无法自动推断 Kafka 服务的范围时设置此项。

当服务凭据不可用时,请使用 SASL/SSL 选项(作为 kafka.* 属性传递)。 使用服务凭据时,无需指定kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocol指定。

密钥 默认 有效值 说明
kafka.security.protocol None 安全协议字符串,例如SASL_SSLSSLPLAINTEXT 代理通信的安全协议。
kafka.sasl.mechanism None SASL 机制字符串,例如PLAIN,、SCRAM-SHA-256SCRAM-SHA-512OAUTHBEARERAWS_MSK_IAM SASL 机制。
kafka.sasl.jaas.config None JAAS 配置字符串 JAAS 登录配置字符串。
kafka.sasl.login.callback.handler.class None 完全限定的类名 SASL 身份验证的登录回调处理程序的完全限定类名。
kafka.sasl.client.callback.handler.class None 完全限定的类名 用于 SASL 身份验证的客户端回调处理程序的完全限定类名。
kafka.ssl.truststore.location None 文件路径字符串 SSL 信任存储文件的路径。
kafka.ssl.truststore.password None 任意字符串 SSL 信任存储文件的密码。
kafka.ssl.keystore.location None 文件路径字符串 SSL 密钥存储文件的路径。
kafka.ssl.keystore.password None 任意字符串 SSL 密钥存储文件的密码。

有关完整的身份验证设置说明,请参阅 “身份验证”。

Pub/Sub

使用这些选项订阅 spark.readStream.format("pubsub") Google Pub/Sub。 选项subscriptionIdtopicIdprojectId必需选项。

密钥 默认 有效值 说明
subscriptionId None 任意字符串 必填。 Pub/Sub 订阅 ID。 连接器创建订阅(如果不存在)。
topicId None 任意字符串 必填。 Pub/Sub 主题 ID。
projectId None 任意字符串 必填。 Google Cloud 项目 ID。
numFetchPartitions 流初始化时可用的执行程序数的一半 正整数 从订阅中提取行的并行 Spark 任务数。
maxBytesPerTrigger None 正整数 每个微批处理要处理的字节数的软限制。
maxRecordsPerFetch 1000 正整数 在处理之前,要提取每个任务的行数。
maxFetchPeriod 10s 持续时间字符串,例如 1s1m 每个任务在处理行之前进行获取的时间长度。 Azure Databricks建议使用默认值。
deleteSubscriptionOnStreamStop false truefalse true流式处理查询结束时,订阅将从 subscriptionId中删除。
serviceCredential None 任意字符串 用于向 Pub/Sub 进行身份验证的Azure Databricks服务凭据的名称。 在 Databricks Runtime 16.1 及更高版本中可用。
clientEmail None 电子邮件地址字符串 Google 服务帐户的电子邮件地址。 不使用服务凭据时是必需的。
clientId None 任意字符串 Google 服务帐户的客户端 ID。 不使用服务凭据时是必需的。
privateKey None 私钥字符串 Google 服务帐户的私钥。 不使用服务凭据时是必需的。
privateKeyId None 任意字符串 Google 服务帐户的私钥 ID。 不使用服务凭据时是必需的。

Pulsar

使用这些选项 spark.readStream.format("pulsar") 从 Apache Pulsar 流式传输。 在 Databricks Runtime 14.1 及更高版本中可用。

需要以下选项。 必须准确指定其中一个topictopicstopicsPattern

密钥 默认 有效值 说明
service.url None Pulsar 服务 URL 字符串 例如,Pulsar 服务的 serviceURLPulsarpulsar://broker.example.com:6650
topic None 任意字符串 要使用的单一主题名称。
topics None 以逗号分隔的主题名称列表 要使用的主题名称的逗号分隔列表。
topicsPattern None Java正则表达式字符串 Java正则表达式字符串,以匹配主题名称。

还支持以下选项:

密钥 默认 有效值 说明
admin.url None URL 字符串 Pulsar 管理服务 HTTP URL。 设置时 maxBytesPerTrigger 是必需的。
allowDifferentTopicSchemas false truefalse 如果读取了具有不同架构的多个主题,请使用此选项关闭基于架构的自动主题值反序列化。 仅当此项为 true 时,才会返回原始值。
failOnDataLoss true truefalse 数据丢失时是否失败查询。 例如,删除主题或消息因保留策略而过期时,可能会丢失数据。
maxBytesPerTrigger None 正整数 每个微批处理要处理的字节数的软限制。 需要 admin.url
pollTimeoutMs 120000 正整数 从 Pulsar 读取消息的超时时间(以毫秒为单位)。
predefinedSubscription None 任意字符串 连接器用于跟踪 Spark 应用程序进度的预定义订阅名称。
startingOffsets latest latestearliest或 JSON 偏移字符串 从何处开始阅读。
subscriptionPrefix None 任意字符串 连接器用于生成随机订阅以跟踪 Spark 应用程序进度的前缀。
waitingForNonExistedTopic false truefalse 连接器是否等待,直到创建所需的主题。

可以使用以下选项模式指定其他 Pulsar 客户端、管理员和读取器配置:

Pattern 配置选项
pulsar.admin.* Pulsar 管理员配置
pulsar.client.* Pulsar 客户端配置,包括身份验证选项,例如 pulsar.client.authPluginClassNamepulsar.client.authParams
pulsar.reader.* Pulsar 读取器配置

有关 Pulsar 客户端和管理员身份验证选项的详细信息,请参阅 身份验证

身份验证

Azure Databricks 支持向 Pulsar 进行信任存储和密钥存储身份验证。 Azure Databricks建议使用机密来存储身份验证详细信息。 请参阅机密管理

密钥 默认 有效值 说明
pulsar.client.authPluginClassName None 完全限定的类名 身份验证插件的完全限定类名。 例如,org.apache.pulsar.client.impl.auth.AuthenticationTls
pulsar.client.authParams None 凭据字符串 作为字符串传递给身份验证插件的身份验证凭据。 例如,tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem
pulsar.client.useKeyStoreTls false truefalse 启用 true基于 KeyStore 的 TLS 配置而不是 PEM 格式化文件时。
pulsar.client.tlsTrustStoreType None 任意字符串 TLS 信任存储文件的格式。 例如,JKS
pulsar.client.tlsTrustStorePath None 文件路径字符串 包含受信任 CA 证书的 TLS 信任存储文件的路径。 当 pulsar.client.useKeyStoreTlstrue 时为必需项。
pulsar.client.tlsTrustStorePassword None 任意字符串 TLS 信任存储文件的密码。

如果流使用 a PulsarAdmin,还可以设置以下选项:

密钥 默认 有效值 说明
pulsar.admin.authPluginClassName None 完全限定的类名 Pulsar 管理员客户端的身份验证插件的完全限定类名。
pulsar.admin.authParams None 凭据字符串 Pulsar 管理员客户端身份验证插件的身份验证凭据。
pulsar.admin.useTls None truefalse 是否对 Pulsar 管理员客户端连接使用 TLS。
pulsar.admin.tlsAllowInsecureConnection None truefalse 是否允许 Pulsar 管理客户端的不安全 TLS 连接。
pulsar.admin.tlsTrustCertsFilePath None 文件路径字符串 Pulsar 管理员客户端的受信任 TLS 证书文件的路径。
pulsar.admin.useKeyStoreTls None truefalse 是否对 Pulsar 管理客户端使用基于 KeyStore 的 TLS。
pulsar.admin.tlsTrustStoreType None 任意字符串 Pulsar 管理客户端的 TLS 信任存储的格式。 例如,JKS
pulsar.admin.tlsTrustStorePath None 文件路径字符串 Pulsar 管理员客户端的 TLS 信任存储文件的路径。 当 pulsar.admin.useKeyStoreTlstrue 时为必需项。
pulsar.admin.tlsTrustStorePassword None 任意字符串 Pulsar 管理员客户端 TLS 信任存储的密码。

有关身份验证示例,请参阅 “向 Pulsar 进行身份验证”。

DataFrameWriter 选项

将这些选项用于 DataFrameWriter.option()DataFrameWriterV2.option()来控制Azure Databricks写入数据的方式。

Example

以下示例设置为mergeSchemaTrue写入 Delta Lake 表:

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")

Avro

编写 Avro 文件时,以下选项适用。

密钥 默认 有效值 说明
avroSchema None JSON 架构字符串 完整的 Avro 架构作为 JSON 字符串。 使用此选项可将 Spark SQL 类型转换为特定的 Avro 类型。 适用于 读取和写入 Avro 文件
avroSchemaUrl None URL 字符串 指向 Avro 架构文件的 URL。 使用,而不是 avroSchema 在外部存储架构时使用。 与 avroSchema 互斥。 适用于 读取和写入 Avro 文件
compression snappy uncompresseddeflatesnappy (default)bzip2xzzstandard 编写时要使用的压缩编解码器。 适用于 读取和写入 Avro 文件
recordName topLevelRecord 任意字符串 输出 Avro 架构中的顶级记录名称。 适用于 读取和写入 Avro 文件
positionalFieldMatching false truefalse 是否按字段位置而不是按名称匹配 Spark 架构和 Avro 架构之间的列。 适用于 读取和写入 Avro 文件
recordNamespace 空字符串 任意字符串 输出 Avro 架构中顶级记录的命名空间。 适用于 读取和写入 Avro 文件

Delta Lake 和 Apache Iceberg

编写 Delta Lake 和 Apache Iceberg 表时,以下选项适用。

密钥 默认 有效值 说明
clusterByAuto false truefalse 是否启用自动液体聚类分析,其中Azure Databricks根据查询模式选择聚类分析列。 仅对 . 有效。mode("overwrite") 不能与模式一起使用 append 。 在 Databricks Runtime 16.4 及更高版本中可用。 适用于 对表使用液体聚类分析
mergeSchema None truefalse 是否为写入操作启用架构演变。 源 DataFrame 中的新列将添加到目标表架构。 适用于批处理和流式处理追加。 适用于 更新表架构
overwriteSchema None truefalse 是否在覆盖时替换表架构和分区。 需要 mode("overwrite") 没有 replaceWhere。 不能与 partitionOverwriteMode 一起使用。 适用于 更新表架构
partitionOverwriteMode None staticdynamic 分区覆盖模式。 将其设置为 dynamic 仅覆盖包含新数据的分区,使所有其他分区保持不变。 旧模式,在无服务器计算或 Databricks SQL 上不受支持。 适用于 选择性地使用 Delta Lake 覆盖数据
replaceOn None 布尔表达式字符串 一个布尔表达式,与目标表中的行匹配,以替换为源查询中的行。 可以引用目标表和源查询中的列。 将删除并替换与源行匹配的目标中的行。 如果源为空,则不会发生删除操作。 用于 targetAlias 消除列引用的歧义。 在 Databricks Runtime 17.1 及更高版本中可用。 适用于 选择性地使用 Delta Lake 覆盖数据
replaceUsing None 以逗号分隔的列名称列表 用于匹配目标表和源查询之间的行的列名的逗号分隔列表。 目标和源必须包含所有列出的列。 在相等比较下与源行匹配的目标中的行将被删除并替换。 NULL 值被视为不相等且不匹配。 在 Databricks Runtime 16.3 及更高版本中可用。 适用于 选择性地使用 Delta Lake 覆盖数据
replaceWhere None 谓词表达式字符串 谓词表达式。 以原子方式仅覆盖与谓词匹配的记录。 适用于 选择性地使用 Delta Lake 覆盖数据
targetAlias None 任意字符串 目标表的字符串别名。 replaceOn当条件同时引用目标表和源查询中的列时,请使用或replaceWhere消除列引用的歧义。 适用于 选择性地使用 Delta Lake 覆盖数据
txnAppId None 任意字符串 用于标识应用程序中幂等写入 foreachBatch 操作的应用程序的唯一字符串。 与 一 txnVersion 起使用,确保对多个 Delta Lake 表进行恰好一次的写入。 foreachBatch适用于幂等表写入
txnVersion None 单调递增整数 单调增加的数字,用作操作中幂等写入的 foreachBatch 事务版本。 与 一 txnAppId 起使用,确保对多个 Delta Lake 表进行恰好一次的写入。 foreachBatch适用于幂等表写入
optimizeWrite None truefalse 是否为此写入操作启用自动优化写入。 重写 spark.databricks.delta.optimizeWrite.enabled 配置。 适用于 Azure Databricks? 中的 Delta Lake 是什么。
userMetadata None 任意字符串 追加到写入操作的提交元数据中的用户定义的字符串。 在 . 的 DESCRIBE HISTORY输出中可见 适用于 使用自定义元数据扩充表

CSV

编写 CSV 文件时,以下选项适用。

密钥 默认 有效值 说明
charToEscapeQuoteEscaping \0 (未启用) 单个字符 与引号字符不同时,用于转义转义字符的字符。 适用于 csv(DataFrameWriter)。
compression none none (default)bzip2gziplz4snappydeflatezstd 编写时要使用的压缩编解码器。 适用于 csv(DataFrameWriter)。
dateFormat yyyy-MM-dd 日期格式字符串 日期列值的格式字符串。 适用于 csv(DataFrameWriter)。
emptyValue 空字符串 任意字符串 为空(非 null)值编写的字符串。 适用于 csv(DataFrameWriter)。
encoding UTF-8 名称java.nio.charset.Charset 输出文件的字符编码。 适用于 csv(DataFrameWriter)。
escape \ 单个字符 用于转义带引号值的字符。 适用于 csv(DataFrameWriter)。
escapeQuotes true truefalse 是否转义带引号字段值内的引号字符。 适用于 csv(DataFrameWriter)。
header false truefalse 是否将列名称写入输出的第一行。 适用于 csv(DataFrameWriter)。
ignoreLeadingWhiteSpace false truefalse 是否在写入时从值中剪裁前导空格。 适用于 csv(DataFrameWriter)。
ignoreTrailingWhiteSpace false truefalse 是否在写入时从值中剪裁尾随空格。 适用于 csv(DataFrameWriter)。
lineSep \n 一个字符串 记录之间使用的行分隔符字符串。 适用于 csv(DataFrameWriter)。
locale en-US 标识符java.util.Locale 一个 java.util.Locale 标识符。 标识Java区域设置,影响 CSV 中的默认日期、时间戳和十进制分析。
nullValue 空字符串 任意字符串 为 null 值编写的字符串。 适用于 csv(DataFrameWriter)。
quote " 单个字符 用于引用包含分隔符的字段值的字符。 适用于 csv(DataFrameWriter)。
quoteAll false truefalse 是否将所有字段值括在引号中,而不考虑内容。 适用于 csv(DataFrameWriter)。
sep , 一个字符串 字段分隔符字符。 适用于 csv(DataFrameWriter)。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 时间戳格式字符串 时间戳列值的格式字符串。 适用于 csv(DataFrameWriter)。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 时间戳格式字符串 设置时间戳的字符串格式,不带时区 (TimestampNTZType) 列值。

Excel

编写Excel文件时,以下选项适用。

密钥 默认 有效值 说明
dataAddress None 工作表名称或单元格引用字符串 写入的工作表名称或起始单元格。 如果省略,则写入从单元格Sheet1开始命名A1的工作表。 接受工作表名称(SheetName)或单个单元格引用(SheetName!A1)。 写入不支持单元格区域。
dateFormatInWrite yyyy-mm-dd Excel日期格式字符串 应用于 Date 列的单元格格式字符串Excel。 使用Excel格式语法。
headerRows 0 01 是否将列名写入第一行。
timestampNTZFormat yyyy-mm-dd hh:mm:ss Excel时间戳格式字符串 应用于 TimestampNTZTimestamp 列的单元格格式字符串Excel。 使用Excel格式语法。
version xlsx xlsxxls 要写入的Excel文件格式版本。

JSON

编写 JSON 文件时,以下选项适用。

密钥 默认 有效值 说明
compression none nonebzip2gziplz4snappydeflatezstd 编写时要使用的压缩编解码器。 适用于 json (DataFrameWriter)。
dateFormat yyyy-MM-dd 日期格式字符串 日期列值的格式字符串。 适用于 json (DataFrameWriter)。
encoding UTF-8 名称java.nio.charset.Charset 输出文件的字符编码。 适用于 json (DataFrameWriter)。
ignoreNullFields spark.sql.jsonGenerator.ignoreNullFields truefalse 是否省略 JSON 输出中具有 null 值的字段。 适用于 json (DataFrameWriter)。
lineSep \n 一个字符串 记录之间使用的行分隔符字符串。 适用于 json (DataFrameWriter)。
locale en-US 标识符java.util.Locale 影响 JSON 中默认日期、时间戳和十进制分析的Java区域设置标识符。
pretty false truefalse 是否启用漂亮的(缩进、多行)JSON 输出。
sortKeys false truefalse 是否按字母顺序对输出中的 JSON 对象的键进行排序。 可用于生成确定性输出。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 时间戳格式字符串 时间戳列值的格式字符串。 适用于 json (DataFrameWriter)。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 时间戳格式字符串 设置时间戳的字符串格式,不带时区 (TimestampNTZType) 列值。
writeNonAsciiCharacterAsCodePoint false truefalse 是否将非 ASCII 字符编码为 \uXXXX Unicode 转义序列,而不是输出中的文本 UTF-8 字符。

ORC

编写 ORC 文件时,以下选项适用。

密钥 默认 有效值 说明
compression zstd noneuncompressedsnappyzliblzozstdlz4brotli 编写时要使用的压缩编解码器。 适用于 orc(DataFrameWriter)。

Parquet

编写 Parquet 文件时,以下选项适用。

密钥 默认 有效值 说明
compression snappy noneuncompressedsnappygziplzobrotlilz4lz4_rawzstd 编写时要使用的压缩编解码器。 适用于 parquet(DataFrameWriter)。
spark.sql.parquet.outputTimestampType INT96 INT96TIMESTAMP_MICROSTIMESTAMP_MILLIS 用于对时间戳列进行编码的物理类型。 用于 INT96 与不支持标准时间戳类型的旧 Parquet 读取器兼容。

文本

编写文本文件时,以下选项适用。

密钥 默认 有效值 说明
compression none nonebzip2gziplz4snappydeflatezstd 编写时要使用的压缩编解码器。 适用于文本(DataFrameWriter)。
encoding UTF-8 名称java.nio.charset.Charset 输出文件的字符编码。
lineSep \n 一个字符串 记录之间使用的行分隔符字符串。 适用于文本(DataFrameWriter)。

XML

编写 XML 文件时,以下选项适用。

密钥 默认 有效值 说明
arrayElementName item 任意字符串 没有显式名称的数组元素的元素名称。 适用于 xml(DataFrameWriter)。
attributePrefix _ 任意字符串 前面追加到与 XML 属性对应的字段名称的前缀。 适用于 xml(DataFrameWriter)。
compression none nonebzip2gziplz4snappydeflatezstd 编写时要使用的压缩编解码器。 适用于 xml(DataFrameWriter)。
dateFormat yyyy-MM-dd 日期格式字符串 日期列值的格式字符串。 适用于 xml(DataFrameWriter)。
declaration version="1.0" encoding="UTF-8" standalone="yes" 要取消的 XML 声明字符串或空字符串 在每个输出文件的顶部写入的 XML 声明字符串。 设置为空字符串以禁止声明。 适用于 xml(DataFrameWriter)。
encoding UTF-8 名称java.nio.charset.Charset 输出文件的字符编码。 适用于 xml(DataFrameWriter)。
indent 4 个空格 任意字符串 用于缩进输出中的子元素的字符串。 设置为空字符串以关闭缩进,并在单个行上写入每行。
locale en-US 标识符java.util.Locale 影响 XML 中的默认日期、时间戳和小数格式的Java区域设置标识符。
nullValue null 任意字符串 为 null 值编写的字符串。 如果设置为 nullnull 字段,则省略 null 字段的属性和子元素。 适用于 xml(DataFrameWriter)。
rootTag ROWS 任意字符串 包装输出中所有行元素的根元素标记。 适用于 xml(DataFrameWriter)。
rowTag ROW 任意字符串 表示输出中的行的元素标记。 适用于 xml(DataFrameWriter)。
singleVariantColumn None 列名字符串 要写入 XML 文件的单个 Variant 列的名称。 适用于 xml(DataFrameWriter)。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 时间戳格式字符串 时间戳列值的格式字符串。 适用于 xml(DataFrameWriter)。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 时间戳格式字符串 设置时间戳的字符串格式,不带时区列值。 适用于 xml(DataFrameWriter)。
validateName true truefalse 如果列名不是有效的 XML 元素标识符,是否引发异常。 适用于 xml(DataFrameWriter)。
valueTag _VALUE 任意字符串 用于 XML 元素中也具有属性或子元素的字符数据的字段名称。 适用于 xml(DataFrameWriter)。

DataStreamWriter 选项

使用这些选项 DataStreamWriter.option() 来配置流式写入。

Example

以下示例设置流的检查点位置:

Python
(df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table"))
Scala
df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table")

Common

以下选项适用于所有流式写入操作。

密钥 默认 有效值 说明
checkpointLocation 无(必需) 路径字符串 流式处理查询的检查点目录的路径。 容错和恰好一次的处理保证是必需的。 每个流式处理查询都必须使用唯一的检查点位置。 Databricks 建议将检查点存储在 Unity 目录卷或云存储路径中。 请参阅结构化流式处理检查点
path None 路径字符串 基于文件的流式处理接收器(如 Parquet)的输出路径。 仅适用于基于文件的格式。

控制台接收器

将流写入控制台接收器时,以下选项适用。

密钥 默认 有效值 说明
numRows 20 正整数 写入控制台接收器时,要为每个微批处理显示的行数。
truncate true truefalse 是否在显示行时截断长字符串。 设置为 false 显示完整字符串值。

Delta Lake

使用 format("delta") 将流写入 Delta Lake 表时,以下选项适用。 仅覆盖选项,例如overwriteSchemareplaceWhere,不支持partitionOverwriteMode流式写入。

密钥 默认 有效值 说明
mergeSchema false truefalse 流式处理数据帧包含新列时是否要改进 Delta Lake 表架构。 仅适用于追加输出模式。 适用于 更新表架构
userMetadata None 任意字符串 追加到写入操作的提交元数据中的用户定义的字符串。 在 . 的 DESCRIBE HISTORY输出中可见 适用于 使用自定义元数据扩充表

文件存储端

将流写入基于文件的格式(Parquet、JSON、CSV、ORC、text)时,适用以下选项。 有关特定于格式的选项,请参阅 DataFrameWriter 选项

密钥 默认 有效值 说明
retention None 时间字符串,例如 7 days24 hours 保留用于容错和压缩的接收器元数据文件的时长。 如果未设置,元数据文件将无限期保留。

Kafka 接收器

写入 Kafka 时,以下选项适用。

密钥 默认 有效值 说明
kafka.bootstrap.servers None 字符串的 host:port 逗号分隔列表 必填。 Kafka 中转站 host:port 地址的逗号分隔列表。
topic None 任意字符串 所有行的目标 Kafka 主题。 如果 DataFrame 不包含列, topic 则为必需。
kafka.* None 任何 Kafka 生成者配置 任何带有前缀的 kafka.。 例如,kafka.compression.type

内存接收器

将流写入内存接收器时,以下选项适用。

密钥 默认 有效值 说明
queryName 无(必需) 任意字符串 查询写入的内存中表的名称。 内存接收器是必需的。 也可以通过 .queryName(). 进行配置。
mode exactlyonce exactlyonceatleastonce 内存接收器的传递保证。 exactlyonce 使用具有完全一次语义的微批处理模式。 atleastonce 使用具有至少一次语义的连续模式。

Spark 函数选项

某些 Spark SQL 内置函数接受控制 options 分析或序列化行为的映射。 将选项作为 Python dict 或 Scala Map[String, String] 传递。

Example

以下示例在删除格式不正确的记录时分析 JSON 列:

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))

Avro

Avro 函数接受与相应的数据帧选项相同的选项:

Example

以下示例解码启用了架构演变的 Avro 列:

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro

val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))

此外,架构注册表变体和from_avroto_avro接受以下选项:

密钥 默认 有效值 说明
schemaId None 架构 ID 整数 从 Confluent 架构注册表中解码与架构不兼容 jsonFormatSchema的 Avro 数据时要使用的架构 ID。 from_avro仅适用于。
confluent.schema.registry.* None 任何 Confluent SR 客户端属性值 Confluent 架构注册表客户端配置属性。 使用此前缀传递任何 Confluent SR 客户端属性,例如 confluent.schema.registry.basic.auth.user.info 基本身份验证凭据。 架构注册表变体 from_avroto_avro.

CSV

CSV 函数接受与相应的数据帧选项相同的选项:

Example

以下示例使用自定义分隔符和 NULL 值读取 CSV:

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))

JSON

JSON 函数接受与相应的 DataFrame 选项相同的选项:

Example

以下示例编写 JSON,其中 NULL 字段被忽略并启用了漂亮的格式设置:

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json

val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))

Protobuf

from_protobuf 不使用 to_protobuf 基于文件的 DataSource。 Protobuf 数据始终使用这些函数读取和写入为二进制列。 选项以区分 Map[String, String] 大小写的形式传递。

Example

以下示例使用 PERMISSIVE 模式解码 Protobuf 列:

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
    {"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf

val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
    Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))

Protobuf 函数使用以下选项:

密钥 默认 有效值 说明
mode FAILFAST FAILFASTPERMISSIVE 如何处理损坏的记录。 FAILFAST 引发异常。 PERMISSIVE 将格式不正确的字段设置为 null。 适用于 from_protobuf.
recursive.fields.max.depth -1 (已禁用) 010 递归 Protobuf 字段的最大递归深度。 设置为 0 关闭递归字段支持。 适用于 from_protobuf.
convert.any.fields.to.json false truefalse 是否将 Protobuf Any 字段转换为 JSON 字符串而不是 STRUCT. 适用于 from_protobuf.
emit.default.values false truefalse 是发出包含零或默认值(proto3 语义)的字段。 当输出中省略具有默认值的字段时 false。 适用于 from_protobuf.
enums.as.ints false truefalse 是否将枚举字段呈现为整数值而不是字符串。 适用于 from_protobuf.
upcast.unsigned.ints false truefalse 是否向上转换uint32Longuint64Decimal(20,0)防止整数溢出。 适用于 from_protobuf.
unwrap.primitive.wrapper.types false truefalse 是否将包装包装器类型(例如google.protobuf,和Int32Value)解包StringValue到相应的基元 Spark 类型。 适用于 from_protobuf.
retain.empty.message.types false truefalse 是否通过插入虚拟列来保留输出架构中的空 Protobuf 消息类型。 适用于 from_protobuf.
schema.registry.subject None 任意字符串 架构注册表使用者名称。 使用架构注册表变体 from_protobufto_protobuf.
schema.registry.address None 字符串host:port 架构注册表地址(主机和端口)。 使用架构注册表变体 from_protobufto_protobuf.
schema.registry.protobuf.name None 任意字符串 指定架构注册表主体包含多个消息时要使用的 Protobuf 消息。 可选。
schema.registry.schema.evolution.mode "restart" "restart""none" 在传入记录中检测到较新的架构 ID 时,如何处理架构更改。 "restart" 使用 ; UnknownFieldException配置作业以在无法选取更改时重启查询。 "none" 忽略架构 ID 更改,并使用原始架构分析较新的记录。
confluent.schema.registry.<option> 任何有效的 Confluent 架构注册表客户端选项值 使用前缀传递任何 "confluent.schema.registry"选项。 例如,设置为"confluent.schema.registry.basic.auth.credentials.source""USER_INFO""confluent.schema.registry.basic.auth.user.info"配置"<KEY>:<SECRET>"基本身份验证。

XML

XML 函数接受与相应的 DataFrame 选项相同的选项:

Example

以下示例使用自定义根标记和行标记编写 XML:

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml

val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))