此页面列出了用于读取和写入数据的 Spark API 的可用输入和输出选项。
DataFrameReader 选项
将这些选项用于 DataFrameReader.option()、DataFrameReader.options()、read_files、COPY INTO 和 Auto Loader 来控制Azure Databricks读取数据文件的方式。
Example
以下示例设置为multiLineTrue读取 JSON 文件:
Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
Common
以下选项适用于所有文件格式。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
ignoreCorruptFiles |
false |
true、false |
是否忽略损坏的文件。 如果为 true,则当遇到损坏的文件时,Spark 作业将继续运行,并且仍会返回已读取的内容。 因此COPY INTO,可以观察已跳过的文件,numSkippedCorruptFiles如 operationMetrics Delta Lake 历史记录列中所示。 在 Databricks Runtime 11.3 LTS 及更高版本中可用。 |
ignoreMissingFiles |
false 用于自动加载程序( trueCOPY INTO 旧版) |
true、false |
是否忽略缺少的文件。 如果为 true,则 Spark 作业在遇到缺少的文件且仍返回内容时继续运行。 在 Databricks Runtime 11.3 LTS 及更高版本中可用。 |
modifiedAfter |
None | 时间戳字符串 | 可选时间戳作为筛选器,用于仅引入在指定时间戳之后具有修改时间戳的文件。 |
modifiedBefore |
None | 时间戳字符串 | 可选时间戳作为筛选器,用于仅引入在指定时间戳之前具有修改时间戳的文件。 |
pathGlobFilter 或 fileNamePattern |
None | glob 模式字符串 | 选择文件的潜在 glob 模式。 等效于 PATTERN ( COPY INTO 旧版)。
fileNamePattern 可在 read_files 中使用。 |
recursiveFileLookup |
false |
true、false |
当此选项搜索嵌套目录时 true,即使它们的名称不遵循分区命名方案(例如 date=2019-07-01)。 |
Avro
读取 Avro 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
avroSchema |
None | Avro 架构字符串 | 用户以 Avro 格式指定的可选架构。 读取 Avro 时,此选项可以设置为兼容但不同于实际 Avro 架构的不断发展架构。 反序列化架构与不断发展的架构一致。 例如,如果设置一个具有默认值的其他列的演变架构,则读取结果也包含新列。 |
avroSchemaEvolutionMode |
none |
none、restart |
如何使用架构注册表处理架构演变。
none 忽略架构更改并继续执行作业。
restart
UnknownFieldException在检测到架构更改并要求重启作业时引发架构更改。 |
datetimeRebaseMode |
LEGACY |
EXCEPTION、LEGACY、CORRECTED |
控制 DATE 和 TIMESTAMP 值在儒略历与外推格里历之间的基本值重定。 |
enableStableIdentifiersForUnionType |
false |
true、false |
是否对 Avro Union 类型使用稳定的字段名称。 启用后,联合类型字段名称派生自其小写类型名称(例如, member_int) member_string。 如果两个类型名称在下限后相同,则引发异常。 |
mergeSchema |
false |
true、false |
是否在多个文件中推断模式并合并每个文件的模式。 Avro 的 mergeSchema 不放宽数据类型。 |
mode |
FAILFAST |
FAILFAST、PERMISSIVE、DROPMALFORMED |
用于处理损坏记录的分析器模式。
FAILFAST 引发异常。
PERMISSIVE 将格式不正确的字段设置为 null。
DROPMALFORMED 静默删除错误的记录。 |
readerCaseSensitive |
true |
true、false |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 |
recursiveFieldMaxDepth |
None |
0 至 15 |
递归 Avro 字段的最大递归深度。 设置为1截断所有递归字段,2以允许一级递归等。15 取消设置或 0不允许递归字段。 |
rescuedDataColumn |
None | 列名字符串 | 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。有关更多详细信息,请参考什么是已恢复的数据列?。 |
stableIdentifierPrefixForUnionType |
member_ |
任意字符串 | 用于稳定联合类型字段名称的 enableStableIdentifiersForUnionType=true前缀。 |
CSV
读取 CSV 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
badRecordsPath |
None | 路径字符串 | 用于记录错误 CSV 记录信息的文件存储路径。 |
charToEscapeQuoteEscaping |
\0 |
单个字符 | 用来对引号的转义字符进行转义的字符。 例如,对于以下记录:[ " a\\", b ]:
|
columnNameOfCorruptRecord |
_corrupt_record |
列名字符串 | 支持自动加载程序。 不支持 COPY INTO(旧版)。用于存储因格式不正确而无法分析的记录的列。 如果用于分析的 mode 设置为 DROPMALFORMED,则此列将为空。 |
comment |
\0 |
单个字符 | 定义表示行注释的字符(位于文本行的开头时)。 请使用 '\0' 来禁用注释跳过。 |
dateFormat |
yyyy-MM-dd |
日期格式字符串 | 用于分析日期字符串的格式。 |
emptyValue |
空字符串 | 任意字符串 | 空值的字符串表示形式。 |
enableDateTimeParsingFallback |
false |
true、false |
当不能使用指定格式分析值时,是否回退到旧日期和时间时间戳分析行为。 当 false,分析失败时会引发错误或生成 null, mode具体取决于。 |
encoding 或 charset |
UTF-8 |
名称java.nio.charset.Charset |
CSV 文件编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset。 当 UTF-16 为 UTF-32 时,不能使用 multiline 和 true。 |
enforceSchema |
true |
true、false |
是否将指定的或推理出的架构强制应用于 CSV 文件。 如果启用此选项,则会忽略 CSV 文件的标题。 默认情况下,当使用自动加载程序来补救数据并允许架构演变时,会忽略此选项。 |
escape |
\ |
单个字符 | 分析数据时要使用的转义字符。 |
extension |
csv |
文件扩展名字符串 | 读取的预期文件扩展名。 不带此扩展名的文件将被筛选掉。 |
failOnUnknownFields |
false |
true、false |
CSV 记录是否包含架构中不存在的列时是否失败。 何时 false,无法识别的列将静默删除或救援, rescuedDataColumn具体取决于。 |
failOnWidenedFields |
false |
true、false |
如果字段值在未扩大的情况下无法解析为声明的架构类型,则是否失败。 当 false,类型扩大的值会根据情况 rescuedDataColumn以无提示方式进行救援。 设置 failOnUnknownFields=true 可以屏蔽此选项的效果。 |
header |
false |
true、false |
CSV 文件是否包含标题。 自动加载程序在推理架构时会假定文件具有标题。 |
ignoreLeadingWhiteSpace |
false |
true、false |
是否忽略每个所分析值的前导空格。 |
ignoreTrailingWhiteSpace |
false |
true、false |
是否忽略每个解析值的尾随空格。 |
inferSchema |
false |
true、false |
是推断所解析的 CSV 记录的数据类型,还是假定所有列都是 StringType 类型的。 如果设置为 true,则需要对数据进行另一轮操作。 对于自动加载程序,请改用 cloudFiles.inferColumnTypes。 |
inputBufferSize |
1048576 (1 MB) |
正整数 | CSV 分析程序缓冲区大小(以字节为单位)。 用于在分析大型 CSV 文件时优化内存使用情况。 |
lineSep |
无,涵盖 \r、 \r\n和 \n |
一个字符串 | 两个连续 CSV 记录之间的字符串。 |
locale |
US |
标识符java.util.Locale |
标识Java区域设置,影响 CSV 中的默认日期、时间戳和十进制分析。 |
maxCharsPerColumn |
-1 |
正整数或 -1 无限制 |
预期要解析的值的最大字符数。 可用于避免内存错误。 默认为 -1,表示无限制。 |
maxColumns |
20480 |
正整数 | 记录可以包含的列数的硬限制。 |
mergeSchema |
false |
true、false |
是否在多个文件中推断模式并合并每个文件的模式。 已默认在推理架构时为自动加载程序启用。 |
mode |
PERMISSIVE |
PERMISSIVE、DROPMALFORMED、FAILFAST |
围绕处理格式错误的记录提供的分析程序模式。 |
multiLine |
false |
true、false |
CSV 记录是否跨多行。 |
nanValue |
NaN |
任意字符串 | 分析 FloatType 和 DoubleType 列时非数字值的字符串表示形式。 |
negativeInf |
-Inf |
任意字符串 | 分析 FloatType 或 DoubleType 列时负无穷大的字符串表示形式。 |
nullValue |
空字符串 | 任意字符串 | null 值的字符串表示形式。 |
parserCaseSensitive(已弃用) |
false |
true、false |
读取文件时,将标题中声明的列与架构对齐时是否区分大小写。 对于自动加载程序,此项默认为 true。 如果启用,则会在 rescuedDataColumn 中补救大小写不同的列。 出于对 readerCaseSensitive 的偏好,已不推荐使用此选项。 |
positiveInf |
Inf |
任意字符串 | 分析 FloatType 或 DoubleType 列时正无穷大的字符串表示形式。 |
preferDate |
true |
true、false |
如果可能,尝试将字符串推断为日期而不是时间戳。 此外,还必须通过启用 inferSchema 或使用 cloudFiles.inferColumnTypes 自动加载程序来使用架构推理。 |
quote |
" |
单个字符 | 当字段分隔符是值的一部分时用于对值进行转义的字符。 |
readerCaseSensitive |
true |
true、false |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 |
rescuedDataColumn |
None | 列名字符串 | 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参考什么是已恢复的数据列?。COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。 |
sep 或 delimiter |
, |
一个字符串 | 列之间的分隔符字符串。 |
singleVariantColumn |
None | 列名字符串 | 当设置为列名时,将整个 CSV 记录读入具有该名称的单个 VariantType 列,而不是将每个字段分析为其自己的列。 需要 header=true。 |
skipRows |
0 |
正整数或 0 |
应忽略的 CSV 文件开头的行数,包括注释行和空行。 如果 header 为 true,则标头将是第一个未跳过和未注释的行。 |
timeFormat |
HH:mm:ss |
时间格式字符串 | 分析 TimeType 列值的格式。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
时间戳格式字符串 | 用于分析时间戳字符串的格式。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
时间戳格式字符串 | 不带时区 (TimestampNTZType) 字符串的时间戳分析格式。 |
timeZone |
None | 字符串java.time.ZoneId |
分析时间戳和日期时要使用的 java.time.ZoneId。 |
unescapedQuoteHandling |
STOP_AT_DELIMITER |
STOP_AT_CLOSING_QUOTE、BACK_TO_DELIMITER、STOP_AT_DELIMITER、SKIP_VALUE、RAISE_ERROR |
用于处理未转义的引号的策略。 每个允许选项的行为如下所示:
|
Excel
读取Excel文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
dataAddress |
None | 单元格区域或工作表名称字符串 | 要以 Excel 语法读取的单元格区域。 如果省略,则从第一个工作表读取所有有效单元格。 用于 SheetName!C5:H10 从命名工作表读取范围、 C5:H10 从第一个工作表读取区域或 SheetName 从特定工作表读取所有数据。 |
headerRows |
0 |
0、1 |
用作列名标题的初始行数。 指定后 dataAddress ,这将在单元格范围内应用。 当0,列名称将自动生成为_c1,_c2_c3等等。 |
ignoreMissingSheet |
false |
true、false |
是否以无提示方式跳过不包含指定 dataAddress工作表的文件。 如果 false文件缺少请求的工作表,则会引发错误。 仅在指定工作表名称 dataAddress时适用。 |
includePhoneticRuns |
false |
true、false |
在读取 XLSX 文件时,是否包含拼音注释(如 pinyin 或 furigana)连接到单元格字符串值。 |
operation |
readSheet |
readSheet、listSheets |
对Excel工作簿执行的操作。
readSheet 从工作表中读取数据。
listSheets 返回一个包含字段 sheetIndex: long 和 sheetName: String 每个工作表的结构。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
时间戳格式字符串 | 在Excel中存储为字符串的时间戳无时区值的自定义格式字符串。 自定义日期格式遵循 Datetime 模式中的格式。 |
dateFormat |
yyyy-MM-dd |
日期格式字符串 | 读取为字符串 Date值的自定义格式字符串。 自定义日期格式遵循 Datetime 模式中的格式。 |
JSON
读取 JSON 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
allowBackslashEscapingAnyCharacter |
false |
true、false |
是否允许反斜杠对其后面的任何字符进行转义。 如果未启用,则只能对按 JSON 规范显式列出的字符进行转义。 |
allowComments |
false |
true、false |
是否允许在分析的内容中使用 Java、C 和 C++ 样式的注释('/'、'*' 和 '//' 变体)。 |
allowNonNumericNumbers |
true |
true、false |
是否允许将非数字 (NaN) 标记集用作合法浮点数字值。 |
allowNumericLeadingZeros |
false |
true、false |
是否允许整数以附加的(可忽略的)零开头(例如 000001)。 |
allowSingleQuotes |
true |
true、false |
是否允许使用单引号(撇号字符 '\')来引用字符串(名称和字符串值)。 |
allowUnquotedControlChars |
false |
true、false |
是否允许 JSON 字符串包含未转义的控制字符(值小于 32 的 ASCII 字符,包括制表符和换行符)。 |
allowUnquotedFieldNames |
false |
true、false |
是否允许使用 JavaScript 允许的未引用字段名称,但不允许由 JSON 规范使用。 |
alternateVariantEncoding |
None | Z85 |
用于源 JSON 中 Variant 值的编码。 设置为 Z85 解码 Base85 编码的 Variant 值,而不是存储为内联 JSON。 |
badRecordsPath |
None | 路径字符串 | 用于存储记录错误 JSON 信息的文件路径。 在基于文件的数据源中使用 badRecordsPath选项有以下限制:
|
columnNameOfCorruptRecord |
_corrupt_record |
列名字符串 | 用于存储因格式不正确而无法分析的记录的列。 如果用于分析的 mode 设置为 DROPMALFORMED,则此列将为空。 |
dateFormat |
yyyy-MM-dd |
日期格式字符串 | 用于分析日期字符串的格式。 |
dropFieldIfAllNull |
false |
true、false |
在进行架构推理期间是否忽略所有 null 值或空数组和结构的列。 |
encoding 或 charset |
UTF-8 |
名称java.nio.charset.Charset |
JSON 文件编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset。 当 UTF-16 为 UTF-32 时,不能使用 multiline 和 true。 |
inferTimestamp |
false |
true、false |
是否尝试将时间戳字符串推理为 TimestampType。 设置为 true架构推理时,架构推理可能需要明显更长的时间。 必须启用 cloudFiles.inferColumnTypes 才能与自动加载程序一起使用。 |
lineSep |
无,涵盖 \r、 \r\n和 \n |
一个字符串 | 两个连续 JSON 记录之间的字符串。 |
locale |
US |
标识符java.util.Locale |
影响 JSON 中默认日期、时间戳和十进制分析的Java区域设置标识符。 |
maxNestingDepth |
500 |
正整数 | JSON 对象和数组允许的最大嵌套深度。 为深度嵌套文档增加此值。 |
maxNumLen |
1000 |
正整数 | JSON 输入中数字令牌的最大长度。 为包含大数值文本的 JSON 增加此值。 |
maxStringLen |
不限制 | 正整数 | JSON 输入中字符串值的最大长度。 设置为使用大型字符串分析 JSON 时限制内存使用量。 |
mode |
PERMISSIVE |
PERMISSIVE、DROPMALFORMED、FAILFAST |
围绕处理格式错误的记录提供的分析程序模式。 |
multiLine |
false |
true、false |
JSON 记录是否跨多行。 |
prefersDecimal |
false |
true、false |
如果可能,尝试将字符串推断为 DecimalType 而不是浮点型或双精度型。 此外,还必须通过启用 inferSchema 或使用 cloudFiles.inferColumnTypes 自动加载程序来使用架构推理。 |
primitivesAsString |
false |
true、false |
是否将数字和布尔值等基元类型推理为 StringType。 |
readerCaseSensitive |
true |
true、false |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 在 Databricks Runtime 13.3 及更高版本中可用。 |
rescuedDataColumn |
None | 列名字符串 | 是否将因数据类型不匹配或架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参阅什么是已恢复的数据列?COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。 |
singleVariantColumn |
None | 列名字符串 | 是否引入整个 JSON 文档,分析为具有指定字符串的单个 Variant 列作为列的名称。 如果未设置,JSON 字段将引入其自己的列。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
时间戳格式字符串 | 用于分析时间戳字符串的格式。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
时间戳格式字符串 | 不带时区 (TimestampNTZType) 字符串的时间戳分析格式。 |
timeZone |
None | 字符串java.time.ZoneId |
分析时间戳和日期时要使用的 java.time.ZoneId。 |
upgradeExceptionAsBadRecord |
false |
true、false |
是否将类型升级异常(例如,当值不能扩大为声明的列类型时)视为错误的记录,而不是引发异常。 |
Kafka
有关 Kafka 读取器选项的完整列表,请参阅 DataStreamReader Kafka 选项。 以下选项仅适用于使用 spark.read.format("kafka").
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
endingOffsets |
latest |
latest或 JSON 偏移字符串 |
停止阅读的位置。 在 JSON 字符串中, -1 是最新的偏移量。
-2作为最早的偏移量,不允许作为结束偏移量。 这是一个 JSON 偏移字符串示例: {"topicA":{"0":50,"1":-1}} |
endingOffsetsByTimestamp |
None | JSON 时间戳字符串 | 以毫秒为单位指定为时间戳的按分区结束偏移量。 例如: {"topicA":{"0":2000,"1":3000}}。 |
endingTimestamp |
None | 正整数或 0 |
应用于所有分区的全局结束时间戳(以毫秒为单位)。 |
ORC
读取 ORC 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
mergeSchema |
false |
true、false |
是否在多个文件中推断模式并合并每个文件的模式。 |
Parquet
读取 Parquet 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
datetimeRebaseMode |
LEGACY |
EXCEPTION、LEGACY、CORRECTED |
控制 DATE 和 TIMESTAMP 值在儒略历与外推格里历之间的基本值重定。 |
int96RebaseMode |
LEGACY |
EXCEPTION、LEGACY、CORRECTED |
控制 INT96 时间戳值在儒略历与外推格里历之间的基本值重定。 |
mergeSchema |
false |
true、false |
是否在多个文件中推断模式并合并每个文件的模式。 |
readerCaseSensitive |
true |
true、false |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 |
rescuedDataColumn |
None | 列名字符串 | 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参考什么是已恢复的数据列?。COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。 |
状态存储
将这些选项用于 spark.read.format("statestore") 或 read_statestore 表值函数来读取结构化流式处理状态数据。 请参阅读取结构化流式处理状态信息。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
batchId |
最新批处理 ID | 正整数或 0 |
要从中读取的目标批处理。 用于查询查询的早期状态。 该批必须提交,但尚未清理。 |
operatorId |
0 |
正整数或 0 |
要从中读取的目标运算符。 当查询具有多个有状态运算符时使用。 |
storeName |
DEFAULT |
任意字符串 | 要从中读取的目标状态存储名称。 当有状态运算符具有多个状态存储实例时使用。 必须指定流联接或storeNamejoinSide同时指定流联接,但不能同时指定两者。 |
joinSide |
None |
left、right |
要从中读取流联接的目标端。 必须指定流联接或storeNamejoinSide同时指定流联接,但不能同时指定两者。 |
snapshotStartBatchId |
None | 正整数或 0 |
在读取状态时用作起始点的快照的批处理 ID。 读取器通过重播此快照的更改重新生成状态,直到 batchId此快照。 当快照损坏时非常有用。 必须一 snapshotPartitionId起指定 。 不能与 .一起使用 readChangeFeed。 支持启用了更改日志检查点的 HDFS 支持的状态存储和 RocksDB 状态存储。 在 Databricks Runtime 15.4 LTS 及更高版本中可用。 |
snapshotPartitionId |
None | 正整数或 0 |
如果指定,查询将仅读取此分区。 必须一 snapshotStartBatchId起指定 。 不能与 .一起使用 readChangeFeed。 在 Databricks Runtime 15.4 LTS 及更高版本中可用。 |
readChangeFeed |
false |
true、false |
当true,返回指定范围之间的批和changeStartBatchIdchangeEndBatchId之间的状态更改。 需要 changeStartBatchId。 不能与 joinSide、 batchId、 snapshotStartBatchId或 snapshotPartitionId. 在 Databricks Runtime 16.4 LTS 及更高版本中可用。有关详细信息,请参阅 “读取结构化流式处理”状态更改。 |
changeStartBatchId |
None | 正整数或 0 |
更改源范围的起始批 ID。 当 readChangeFeed 是 true 时为必需项。 仅当 readChangeFeed 设置为 true.. 在 Databricks Runtime 16.4 LTS 及更高版本中可用。 |
changeEndBatchId |
最新批处理 ID | 正整数或 0 |
更改源范围的结束批 ID。 必须大于或等于 changeStartBatchId。 仅当 readChangeFeed 设置为 true.. 在 Databricks Runtime 16.4 LTS 及更高版本中可用。 |
stateVarName |
None | 任意字符串 | 要读取的状态变量名称。 状态变量名称是运算符使用的init函数中StatefulProcessor每个变量的唯一transformWithState名称。 使用 transformWithState 运算符时是必需的。 在 Databricks Runtime 16.4 LTS 及更高版本中可用。 |
readRegisteredTimers |
false |
true、false |
当 true读取运算符使用的 transformWithState 已注册计时器时。
transformWithState仅适用于运算符。 在 Databricks Runtime 16.4 LTS 及更高版本中可用。 |
flattenCollectionTypes |
true |
true、false |
当 true,平展为映射和列表状态变量返回的记录。 当 false,以 Spark SQL Array 或 Map.
transformWithState仅适用于运算符。 在 Databricks Runtime 16.4 LTS 及更高版本中可用。 |
文本
读取文本文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
encoding |
UTF-8 |
名称java.nio.charset.Charset |
TEXT 文件行分隔符的编码的名称。 文件的内容不受此选项的影响,仍会读取as-is。 |
lineSep |
无,涵盖\r\r\n和\n |
一个字符串 | 两个连续 TEXT 记录之间的字符串。 |
wholeText |
false |
true、false |
是否将文件读取为单个记录。 |
XML
读取 XML 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
rowTag |
None | 任意字符串 | 要作为行处理的 XML 文件的行标记。 在示例 XML <book> <page><page>...<book> 中,相应的值为 page。 这是必需选项。 |
samplingRatio |
1.0 |
0.0 至 1.0 |
定义用于架构推理的行的一部分。 XML 内置函数会忽略此选项。 |
excludeAttribute |
false |
true、false |
是否排除元素中的属性。 |
mode |
None |
PERMISSIVE、DROPMALFORMED、FAILFAST |
在解析过程中处理损坏的记录的一种模式。
|
inferSchema |
true |
true、false |
如果为 true,则尝试推断每个生成的 DataFrame 列的相应类型。 如果为 false,则生成的所有列均为 string 类型。 XML 内置函数会忽略此选项。 |
columnNameOfCorruptRecord |
spark.sql.columnNameOfCorruptRecord |
列名字符串 | 允许重命名包含模式 PERMISSIVE 创建的格式不正确的字符串的新字段。 |
attributePrefix |
None | 任意字符串 | 属性的前缀,用于区分属性和元素。 这将是字段名称的前缀。 默认值为 _。 读取 XML 时可以为空,但写入时不能为空。 也适用于 DataFrameWriter XML 选项。 |
valueTag |
_VALUE |
任意字符串 | 该标记用于同时具有属性元素或子元素的元素中的字符数据。 用户可以在架构中指定 valueTag 字段,或者当字符数据存在于具有其他元素或属性的元素中时,该字段将在架构推断期间自动添加。 也适用于 DataFrameWriter XML 选项。 |
encoding |
UTF-8 |
名称java.nio.charset.Charset |
若要读取,请根据给定的编码类型解码 XML 文件。 对于写入,请指定已保存的 XML 文件的编码(字符集)。 XML 内置函数会忽略此选项。 也适用于 DataFrameWriter XML 选项。 |
ignoreSurroundingSpaces |
true |
true、false |
是否必须跳过围绕值的空格。 将忽略只有空格的字符数据。 |
rowValidationXSDPath |
None | 文件路径字符串 | 可选 XSD 文件的路径,用于单独验证每行的 XML。 未能验证的行被视为分析错误。 XSD 不会影响架构,无论是指定的还是推断的。 |
ignoreNamespace |
false |
true、false |
如果为 true,则忽略 XML 元素和属性上的命名空间前缀。 例如,标记 <abc:author> 和 <def:author> 会被视为只是 <author>。 不能忽略 rowTag 元素上的命名空间,只忽略其读取子元素。 XML 解析不识别命名空间,即使在有 false 的情况下。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
时间戳格式字符串 | 遵循 日期时间模式 格式的自定义时间戳格式字符串。 它适用于 timestamp 类型。 也适用于 DataFrameWriter XML 选项。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
时间戳格式字符串 | 遵循日期/时间模式格式的不带时区的自定义时间戳格式字符串。 这适用于 TimestampNTZType 类型。 也适用于 DataFrameWriter XML 选项。 |
dateFormat |
yyyy-MM-dd |
日期格式字符串 | 遵循 日期时间模式 格式的自定义日期格式字符串。 这适用于日期类型。 也适用于 DataFrameWriter XML 选项。 |
locale |
en-US |
IETF BCP 47 语言标记 | 将地区设置为 IETF BCP 47 格式的语言标签。 例如,在分析日期和时间戳时使用 locale。 |
nullValue |
字符串 null |
任意字符串 | 设置 null 值的字符串表示形式。 当为 null 时,分析程序不会为字段写入属性和元素。 也适用于 DataFrameWriter XML 选项。 |
readerCaseSensitive |
true |
true、false |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 |
rescuedDataColumn |
None | 列名字符串 | 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法解析的所有数据收集到单独的列中。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参阅什么是被恢复的数据列?
COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。 |
singleVariantColumn |
none |
列名字符串 | 指定单个变体列的名称。 如果指定此选项用于读取,将整个 XML 记录解析为一个单个 Variant 列,并将该选项字符串值作为列名称。 如果为写入指定此选项,将单个 Variant 列的值写入 XML 文件。 也适用于 DataFrameWriter XML 选项。 |
useLegacyXMLParser |
true |
true、false |
是否使用旧版 XML 分析程序。 旧版分析程序对格式不正确的内容进行了不太严格的验证,但内存效率较低。
false设置为选择使用更严格的默认分析程序。 |
wildcardColName |
xs_any |
列名字符串 | 用于捕获与通配符 (xs:any) 架构元素匹配的 XML 元素的列名。 不能与 rescuedDataColumn.一起使用。 |
DataStreamReader 选项
使用这些选项 DataStreamReader.option() 配置 Delta Lake 表和其他基于文件的源的流式读取。
有关文件格式选项(JSON、CSV、Parquet 等),请参阅 DataFrameReader 选项。
有关自动加载程序(cloudFiles.*)选项,请参阅 自动加载程序。
Example
以下示例设置为 maxFilesPerTrigger10 Delta Lake 表流:
Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
Common
以下选项适用于 Delta Lake 表和其他基于文件的流式处理源。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
cleanSource |
off |
off、delete、archive |
如何在流处理源文件后处理源文件。
off 不执行任何操作。
delete 永久删除源文件。
archive 将文件移动到 sourceArchiveDir。
archive设置为时,sourceArchiveDir还必须设置。 不适用于 Delta Lake 表流式处理。 |
fileNameOnly |
false |
true、false |
是否仅按文件名而不是完整路径标识已处理的文件。 当 true,具有相同文件名的不同路径的文件将被视为同一文件,并且不会重新处理。 不适用于 Delta Lake 表流式处理。 |
latestFirst |
false |
true、false |
是否在每个微批处理中首先处理最近修改的文件。 在希望尽快处理最新数据时非常有用。 当和true或maxFilesPerTrigger已设置时maxBytesPerTrigger,maxFileAge将忽略。 不适用于 Delta Lake 表流式处理。 |
maxBytesPerTrigger |
None | 正整数 | 每个微批处理的数据量的软最大值。 如果最小输入单位超过限制,批处理可能会处理超过限制。 一起使用 maxFilesPerTrigger时,微批处理会处理数据,直到达到任何一个限制。对于自动加载程序,请改用 cloudFiles.maxBytesPerTrigger。 请参阅 “常见”。 |
maxCachedFiles |
10000 |
正整数或 0 |
要缓存后续微批处理的未处理文件的最大数量。 设置为 0 关闭缓存。 当源目录包含每个触发器的许多新文件时增加此值。 不适用于 Delta Lake 表流式处理。 |
maxFileAge |
7d |
持续时间字符串,例如 7d 或 4h |
考虑处理的文件的最大期限,相对于最近修改的文件的时间戳,而不是当前系统时间。 忽略超过此阈值的文件。 在设置latestFirst和true设置maxFilesPerTrigger时maxBytesPerTrigger被忽略。 不适用于 Delta Lake 表流式处理。 |
maxFilesPerTrigger |
1000 用于 Delta Lake 和自动加载程序。 对于其他基于文件的源,没有最大值。 |
正整数 | 每个微批处理中处理的新文件数上限。 一起使用 maxBytesPerTrigger时,微批处理会处理数据,直到达到任何一个限制。对于自动加载程序,请改用 cloudFiles.maxFilesPerTrigger。 请参阅 “常见”。 |
sourceArchiveDir |
None | 路径字符串 | 设置为 cleanSource 时archive存档目录的路径。 源文件在处理后移动到此路径,保留其相对目录结构。 不适用于 Delta Lake 表流式处理。 |
自动加载器
将这些选项与源配合使用 cloudFiles ,为从云存储进行流式引入配置 自动加载程序 。 特定于 cloudFiles 源的选项带有 cloudFiles 前缀,以将它们保留在与其他 结构化流 源选项不同的命名空间中。
Common
以下选项适用于所有自动加载程序配置。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
cloudFiles.allowOverwrites |
false |
true、false |
是否允许输入目录文件更改替代现有数据。 有关配置注意事项,请参阅自动加载程序在追加或覆盖文件时是否再次处理文件? |
cloudFiles.backfillInterval |
None | 持续时间字符串,例如 1 day 或 1 week |
自动加载程序可以按给定间隔触发异步回填。 有关详细信息,请参阅 使用 cloudFiles.backfillInterval 触发常规回填。 请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 |
cloudFiles.cleanSource |
OFF |
OFF、DELETE、MOVE |
是自动删除还是从输入目录中移动已处理的文件。 设置为 OFF (默认值)时,不会删除任何文件。设置为 DELETE时,自动加载程序会在处理文件 30 天后自动删除文件。 为此,自动加载程序必须具有对源目录的写入权限。设置为 MOVE 后,自动加载程序将在文件被 cloudFiles.cleanSource.moveDestination 处理后 30 天内自动移动到指定位置。 为此,自动加载程序必须对源目录以及移动位置具有写入权限。当文件在表值函数的结果 commit_time中具有非 null 值cloud_files_state时,会将其视为已处理。 请参阅 cloud_files_state 表值函数。 可以使用 cloudFiles.cleanSource.retentionDuration配置处理后的 30 天额外等待时间。启用 cloudFiles.cleanSource之前,请查看以下注意事项:
在 Databricks Runtime 16.4 及更高版本中可用。 |
cloudFiles.cleanSource.retentionDuration |
30 days |
CalendarInterval 字符串,例如14 days,2 weeks或1 month |
在已处理文件成为通过 cleanSource 进行存档的候选文件之前需等待的时间。 对于 DELETE 操作来说必须长于 7 天。 对于 MOVE 操作来说没有最低限制。在 Databricks Runtime 16.4 及更高版本中可用。 |
cloudFiles.cleanSource.moveDestination |
None | 云存储或 Unity 目录卷路径 | 当 cloudFiles.cleanSource 设置为 MOVE 时已处理文件的存档路径。 这可以是云存储路径或 Unity 目录卷 路径(例如 /Volumes/my_catalog/my_schema/my_volume/archive/)。移动位置必须:
自动加载程序必须对此目录具有写入权限。 在 Databricks Runtime 16.4 及更高版本中可用。 |
cloudFiles.format |
无(必需选项) |
avro、binaryFile、csv、json、orc、parquet、text、xml |
源路径中的数据文件格式。 有效值包括: |
cloudFiles.includeExistingFiles |
true |
true、false |
是包含流式处理输入路径中的现有文件,还是仅处理初始设置后到达的新文件。 仅在您首次启动流时会对该选项进行评估。 在重启流后更改此选项不起作用。 |
cloudFiles.inferColumnTypes |
false |
true、false |
在利用架构推理时是否推断确切的列类型。 默认情况下,在推断 JSON 和 CSV 数据集时,列将被推断为字符串。 有关更多详细信息,请参阅架构推理。 |
cloudFiles.maxBytesPerTrigger |
None | 字节字符串,例如 10g |
每次触发时要处理的最大新字节数。 这是一个软性最大值。 如果每个文件为 3 GB,则 Azure Databricks 在一个微批中可以处理 12 GB。 与 cloudFiles.maxFilesPerTrigger 一起使用时,Azure Databricks 会消耗到 cloudFiles.maxFilesPerTrigger 或 cloudFiles.maxBytesPerTrigger 中的最低限制(以先到达者为准)。 与 Trigger.Once()(Trigger.Once() 已弃用)一起使用时,此选项不起作用。在 Databricks Runtime 18.0 及更高版本中,此选项是动态配置的,不需要手动设置。 |
cloudFiles.maxFileAge |
None | 持续时间字符串 | 出于重复数据删除目的而跟踪文件事件的时长。 Databricks 建议不要优化此参数,除非你是在以每小时数百万个文件的速度引入数据。 有关更多详细信息,请参阅 文件事件跟踪 部分。 过于激进地调整 cloudFiles.maxFileAge 可能会导致数据质量问题,例如重复引入或缺少文件。 因此,Databricks 建议为 cloudFiles.maxFileAge 使用保守设置,例如 90 天,这与类似数据引入解决方案建议的值相当。 |
cloudFiles.maxFilesPerTrigger |
1000 |
正整数 | 要在每个触发器中处理的最大新文件数。 与 cloudFiles.maxBytesPerTrigger 一起使用时,Azure Databricks 会消耗到 cloudFiles.maxFilesPerTrigger 或 cloudFiles.maxBytesPerTrigger 中的最低限制(以先到达者为准)。 与 Trigger.Once()(已弃用)一起使用时,此选项不起作用。在 Databricks Runtime 18.0 及更高版本中,此选项是动态配置的,不需要手动设置。 |
cloudFiles.partitionColumns |
None | 以逗号分隔的列名称列表 | 要从文件的目录结构推断的 Hive 样式分区列的逗号分隔列表。 Hive 样式的分区列是由等号(例如 <base-path>/a=x/b=1/c=y/file.format)组合的键值对。 在此示例中,分区列为 a、b 和 c。 如果使用架构推理并指定 <base-path> 从中加载数据,则这些列会自动添加到架构中。 如果指定架构,自动加载程序要求这些列包含在架构中。 如果你不希望这些列成为架构的一部分,则可以指定 "" 以忽略这些列。 此外,当你希望在复杂目录结构中通过文件路径推断列时,可以使用此选项,如下面的示例所示:<base-path>/year=2022/week=1/file1.csv<base-path>/year=2022/month=2/day=3/file2.csv<base-path>/year=2022/month=2/day=4/file3.csv将 cloudFiles.partitionColumns 指定为 year,month,day 会针对 year=2022 返回 file1.csv,但 month 和 day 列为 null。month 和 day 已正确解析为 file2.csv 和 file3.csv。 |
cloudFiles.schemaEvolutionMode |
addNewColumns 如果未指定架构, none 则为 |
addNewColumns、none、rescue、failOnNewColumns |
在数据中发现新列时对架构进行演变的模式。 默认情况下,在推断 JSON 数据集时,将列推断为字符串。 有关更多详细信息,请参阅架构演变。 |
cloudFiles.schemaHints |
None | 架构字符串 | 在架构推理过程中指定给自动加载程序的架构信息。 有关更多详细信息,请参阅架构提示。 |
cloudFiles.schemaLocation |
无(推断架构所需的) | 路径字符串 | 存储推断出的架构和后续更改的位置。 有关更多详细信息,请参阅架构推理。 |
cloudFiles.useStrictGlobber |
false |
true、false |
是否使用与 Apache Spark 中其他文件源的默认通配行为相匹配的严格通配符。 有关更多详细信息,请参阅常见数据加载模式。 在 Databricks Runtime 12.2 LTS 及更高版本中可用。 |
cloudFiles.validateOptions |
true |
true、false |
是否对自动加载器选项进行验证,并对未知或不一致的选项返回错误。 |
目录列表
使用目录列表模式时,适用以下选项。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
cloudFiles.useIncrementalListing(已弃用) |
auto 在 Databricks Runtime 17.2 及更低版本中, false 在 Databricks Runtime 17.3 及更高版本上 |
auto、true、false |
此功能已弃用。 Databricks 建议将 文件通知模式用于文件事件 而不是使用 cloudFiles.useIncrementalListing。是否在目录列表模式下使用增量列表而不是完整列表。 默认情况下,自动加载程序尽最大努力自动检测给定目录是否适用于增量列表。 可以显式使用增量列表,或者通过将完整目录列表分别设置为 true 或 false 来使用该列表。错误地在非按词汇排序的目录上启用增量列表会阻止自动加载程序发现新文件。 适用于 Azure Data Lake Storage( abfss://)、S3(s3://)和 GCS(gs://)。在 Databricks Runtime 9.1 LTS 及更高版本中可用。 |
文件通知
有关配置文件通知模式(包括所需的云权限、设置说明和身份验证方法)的信息,请参阅 在文件通知模式下配置自动加载程序流。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
cloudFiles.fetchParallelism |
1 |
正整数 | 从队列服务中提取消息时要使用的线程数。 请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 |
cloudFiles.pathRewrites |
None | JSON 映射字符串 | 仅当指定接收 queueUrl 来自多个 S3 存储桶的文件通知并且想要使用配置为访问这些容器中的数据的装入点时,才是必需的。 借助此选项可以使用装入点重写 bucket/key 路径的前缀。 只能重写前缀。 例如,对于配置 {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"},路径 s3://<databricks-mounted-bucket>/path/2017/08/fileA.json 将重写为 dbfs:/mnt/data-warehouse/2017/08/fileA.json。请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 |
cloudFiles.resourceTag |
None | 键值标记字符串 | 一系列键值标记对,可帮助关联和识别相关资源,例如:cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 而是使用云提供程序控制台设置资源标记。有关详细信息,请参阅 云提供程序资源标记。 |
cloudFiles.useManagedFileEvents |
false |
true、false |
设置为 true时,Auto Loader 使用文件事件服务来检测外部位置中的文件。 仅当加载路径位于启用了文件事件的外部位置时,才能使用此选项。 请参阅 将文件通知模式与文件事件配合使用。文件事件在文件发现中提供通知级性能,因为自动加载程序可以在上次运行后发现新文件。 与目录列表不同,此过程不需要列出目录中的所有文件。 在某些情况下,即使启用了文件事件选项,自动加载程序也使用目录列表:
|
cloudFiles.listOnStart |
false |
true、false |
设置为 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN中恢复。 请参阅如何从CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN错误中恢复? |
cloudFiles.useNotifications |
false |
true、false |
是否使用文件通知模式来确定何时存在新文件。 如果为 false,则使用目录列表模式。 请参阅比较自动加载器文件检测模式。请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 |
云提供商资源标记
默认情况下,自动加载程序会尽力添加以下键值标记对:
-
vendor:Databricks -
path:从中加载数据的位置。 由于标签限制,在 GCP 中不可用。 -
checkpointLocation:流检查点的位置。 由于标签限制,在 GCP 中不可用。 -
streamId:流的全局唯一标识符。
Databricks 保留这些密钥名称,并且无法覆盖其值。
有关 Azure 的详细信息,请参阅命名队列和元数据以及properties.labels中的覆盖范围。 自动加载程序将这些键值标记对以 JSON 格式存储为标签。
特定于云
自动加载程序提供了用于为文件通知模式配置云基础结构的选项。 有关所需的云权限和设置说明,请参阅 在文件通知模式下配置自动加载程序流。
蔚蓝
如果指定 cloudFiles.useNotifications = true 并且希望自动加载程序为你设置通知服务,则必须为以下所有选项指定值:
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
cloudFiles.resourceGroup |
None | 任意字符串 | 在其中创建存储帐户的Azure资源组。 |
cloudFiles.subscriptionId |
None | 任意字符串 | 在其中创建资源组的Azure订阅 ID。 |
databricks.serviceCredential |
None | 任意字符串 | 您的 Databricks 服务凭据的名称。 在 Databricks Runtime 16.1 及更高版本中可用。 |
如果 Databricks 服务凭据不可用,可以改为指定以下身份验证选项:
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
cloudFiles.clientId |
None | 任意字符串 | 服务主体的客户端 ID 或应用程序 ID。 |
cloudFiles.clientSecret |
None | 任意字符串 | 服务主体的客户端密钥。 |
cloudFiles.connectionString |
None | 一个连接字符串 | 存储帐户的连接字符串,基于帐户访问密钥或共享访问签名 (SAS)。 |
cloudFiles.tenantId |
None | 任意字符串 | 在其中创建服务主体的Azure租户 ID。 |
仅当设置 cloudFiles.useNotifications = true 并且希望自动加载程序使用现有队列时,才指定以下选项:
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
cloudFiles.queueName |
None | 任意字符串 | Azure 队列的名称。 如果指定,云文件源将直接使用此队列中的事件,而不是设置自己的Azure 事件网格和队列存储服务。 在这种情况下,databricks.serviceCredential 或 cloudFiles.connectionString 只需要对队列具有读取权限。 |
Delta Lake
使用以下 spark.readStream选项从 Delta Lake 表读取时适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
allowSourceColumnDrop |
None | 版本号或 always |
设置为 Delta 表版本号或 always 允许流在从源表架构中删除列后继续。 当设置为版本号时,确认所有架构更改都达到该版本。 需要 schemaTrackingLocation。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明。 |
allowSourceColumnRename |
None | 版本号或 always |
设置为 Delta 表版本号或 always 允许流在源表中重命名列后继续。 当设置为版本号时,确认所有架构更改都达到该版本。 需要 schemaTrackingLocation。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明。 |
allowSourceColumnTypeChange |
None | 版本号或 always |
设置为 Delta 表版本号或 always 允许流在源表中更改列类型后继续。 当设置为版本号时,确认所有架构更改都达到该版本。 需要 schemaTrackingLocation。 请参阅类型扩展。 |
excludeRegex |
None | Java正则表达式字符串 | 正则表达式模式。 与模式匹配的文件从流读取中排除。 用于筛选不符合预期命名约定的文件。 |
failOnDataLoss |
true |
true、false |
如果源数据由于日志保留()而删除了流式处理查询,logRetentionDuration是否失败。 设置为 false 跳过缺失的数据并继续处理。 请参阅为“按时间顺序查看”查询配置数据保留。 |
ignoreChanges(已弃用) |
false |
true、false |
在 Databricks Runtime 11.3 LTS 和更低版本中可用。 在修改操作(如UPDATE、MERGE INTODELETE或OVERWRITE)后重新发出重写数据文件。 可以与新行一起发出未更改的行,因此下游使用者必须处理重复项。 删除操作不会传播到下游。 替换为 skipChangeCommits Databricks Runtime 12.2 LTS 及更高版本。 |
ignoreDeletes(已弃用) |
false |
true、false |
忽略在分区边界处删除数据的事务(仅删除完整分区)。 不处理非分区删除、更新或其他修改。 请改用 skipChangeCommits。 |
readChangeFeed 或 readChangeData |
false |
true、false |
是否启用读取流式处理查询的更改数据馈送。 启用后,流会发出包含其他元数据列的行级更改(插入、更新和删除)。 请参阅在 Azure Databricks 中使用更改数据馈送。 |
schemaTrackingLocation |
None | 路径字符串 | Delta Lake 跟踪流读取的架构更改的目录的路径。 当从启用了列映射的表进行流式处理以及使用 allowSourceColumn* 选项处理架构演变时是必需的。 必须位于 checkpointLocation 流式处理查询中。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明。 |
skipChangeCommits |
false |
true、false |
忽略删除或修改现有记录和进程仅追加的事务。 Databricks 建议对不使用更改数据馈送的大多数工作负荷使用此选项。 在 Databricks Runtime 12.2 LTS 及更高版本中可用。 请参阅 使用 跳过上游更改提交 skipChangeCommits。 |
startingTimestamp |
最新可用 | 时间戳字符串,例如 2019-01-01T00:00:00.000Z 或日期字符串,例如 2019-01-01 |
要从中读取的时间戳。 该流读取指定时间戳或之后提交的所有表更改。 如果时间戳位于所有可用表提交之前,则流从最早的可用提交开始。 不能与 startingVersion.一起使用。 如果流式处理检查点已存在,则忽略。 |
startingVersion |
最新可用 | 正整数, 0或 latest |
要从中读取的增量表版本。 该流读取指定版本或之后提交的所有更改。 指定 latest 仅从最近的更改开始。 不能与 startingTimestamp.一起使用。 如果流式处理检查点已存在,则忽略。 参见 使用表历史记录。 |
withEventTimeOrder |
false |
true、false |
将初始表快照划分为事件时间桶,以防止记录被错误地标记为后期事件,并在带有水印的有状态查询中丢弃。 在开始初始快照处理后无法更改,而无需删除检查点。 在 Databricks Runtime 11.3 LTS 及更高版本中可用。 请参阅 “处理初始快照而不删除数据”。 |
Kafka
将这些选项与以下选项一起使用: spark.readStream.format("kafka") 或 spark.read.format("kafka"):
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
assign |
None | JSON 字符串,例如 {"topicA":[0,1],"topicB":[2,4]} |
要使用的特定分区。 必须准确指定其中一个subscribesubscribePattern或assign选项。 |
failOnDataLoss |
true |
true、false |
如果数据可能由于已删除主题或偏移截断而丢失,则查询是否失败。 设置为 false 跳过缺失的数据并继续。Databricks 保守地估计数据是否丢失。 但是,这可能会导致误报。 |
fetchoffset.numretries |
3 |
正整数或 0 |
提取 Kafka 偏移量失败时重试次数。 |
fetchoffset.retryintervalms |
1000 |
正整数或 0 |
偏移提取重试之间的间隔(以毫秒为单位)。 |
groupIdPrefix |
spark-kafka-source (流式处理), spark-kafka-relation (批处理) |
任意字符串 | 用于自动生成的 Kafka 使用者组 ID 的自定义前缀。 如果 kafka.group.id 显式设置,连接器将忽略此选项。 |
kafka.group.id |
None | 任意字符串 | 读取时要使用的 Kafka 使用者组 ID。 请谨慎使用:共享同一组 ID 的查询相互干扰,并且可能只读取部分数据。 当运行并发批处理和流式处理工作负荷或快速重启查询时,可能会发生这种情况。 如果设置,则会忽略 groupIdPrefix。 若要最大程度地减少问题,请将 Kafka 使用者配置 session.timeout.ms 设置为较小的值。 |
includeHeaders |
false |
true、false |
是否将 Kafka 消息标头作为列包含在输出中。 |
kafkaconsumer.polltimeoutms |
None | 正整数 | Kafka 使用者 poll() 调用的超时(以毫秒为单位)。 |
kafka.bootstrap.servers |
None | 字符串的 host:port 逗号分隔列表 |
Kafka 中转站的 host:port 地址的逗号分隔列表。 设置 Kafka 客户端 bootstrap.servers 的属性。如果发现 Kafka 中没有数据,请检查此中转站地址列表以获取不正确的地址。 如果中转站地址列表不正确,则可能没有任何错误。 Kafka 客户端假设代理最终可用,并在收到网络错误时重试。 |
maxRecordsPerPartition |
None | 正整数 | 每个 Spark 分区的最大记录数。 设置后,连接器会拆分 Kafka 分区,以便每个 Spark 分区最多读取这多条记录。 还可以将此选项与 minPartitions.. 当这两个选项都设置时,Spark 使用哪个选项会导致更多分区。 |
minPartitions |
None | 正整数 | 要从 Kafka 读取的 Spark 分区的最小数目。 设置后,连接器会拆分大型 Kafka 分区以提高并行度。 如果未设置,Spark 会为每个 Kafka 主题分区创建一个分区。 用于处理数据倾斜或峰值负载。 此选项重新初始化每个触发器的 Kafka 使用者,这可能会影响 SSL 的性能。 |
startingOffsets |
latest (流式处理), earliest (批处理) |
earliest、 latest或 JSON 偏移字符串 |
查询开始读取的偏移量。 在 JSON 字符串中, -1 是最新的偏移量。
-2 是最早的偏移量。 例如: {"topicA":{"0":23,"1":-2}}。对于流式处理查询,此选项仅适用于新查询启动时。 恢复的查询始终使用检查点。 在查询期间,新分区开始读取最早的偏移量。 对于批处理查询, latest 不允许使用。 |
startingOffsetsByTimestamp |
None | JSON 时间戳字符串,例如 {"topicA":{"0":1000,"1":2000}} |
每个分区的起始偏移量列表,指定为时间戳(以毫秒为单位)。 如果时间戳不存在偏移量,查询行为由 .startingOffsetsByTimestampStrategy对于流式处理查询,此选项仅适用于新查询启动时。 恢复的查询始终使用检查点。 在查询期间,新分区开始读取最早的偏移量。 |
startingOffsetsByTimestampStrategy |
error |
error、latest |
为指定startingOffsetsByTimestampstartingTimestamp时间戳找到偏移量时要使用的策略。
error 引发异常。
latest 使用最新的可用偏移量。 |
startingTimestamp |
None | 正整数或 0 |
应用于所有分区的全局起始时间戳(以毫秒为单位)。 如果时间戳不存在偏移量,则行为由 .startingOffsetsByTimestampStrategy |
subscribe |
None | 以逗号分隔的主题名称列表 | 要订阅的主题。 必须准确指定其中一个subscribesubscribePattern或assign选项。 |
subscribePattern |
None | Java正则表达式字符串 | 用于订阅主题的模式。 必须准确指定其中一个subscribesubscribePattern或assign选项。 例如,topic.*。 |
以下选项仅适用于流式读取:spark.readStream.format("kafka")
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
bytesEstimateWindowLength |
300s |
持续时间字符串,例如 10m 或 600s |
用于估算指标剩余字节 estimatedTotalBytesBehindLatest 的时间范围。 请参阅检索 Kafka 指标。 |
maxOffsetsPerTrigger |
None | 正整数 | 每个触发器间隔处理的最大偏移量。 偏移量按比例分布于主题分区。 |
maxTriggerDelay |
15m |
持续时间字符串,例如 10m 或 600s |
在触发之前等待 minOffsetsPerTrigger 累积的最长时间。 |
minOffsetsPerTrigger |
None | 正整数 | 触发微批处理之前要累积的最小偏移量。 达到此时间 maxTriggerDelay 后,无论何时运行微批处理。 |
有关仅适用于批处理读取的 spark.read.format("kafka")偏移选项,请参阅 DataFrameReader Kafka 选项。
身份验证
Databricks 建议使用 Unity 目录服务凭据向云托管的 Kafka 服务(AWS MSK、Azure 事件中心 或 Google Cloud Managed Kafka)进行身份验证。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
databricks.serviceCredential |
None | 任意字符串 | 用于向云管理的 Kafka 服务进行身份验证的 Unity 目录 服务凭据 的名称。 在 Databricks Runtime 16.1 及更高版本中可用。 |
databricks.serviceCredential.scope |
None | 任意字符串 | 服务凭据的 OAuth 范围。 仅当Azure Databricks无法自动推断 Kafka 服务的范围时设置此项。 |
当服务凭据不可用时,请使用 SASL/SSL 选项(作为 kafka.* 属性传递)。 使用服务凭据时,无需指定kafka.sasl.mechanism或kafka.sasl.jaas.configkafka.security.protocol指定。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
kafka.security.protocol |
None | 安全协议字符串,例如SASL_SSL, SSLPLAINTEXT |
代理通信的安全协议。 |
kafka.sasl.mechanism |
None | SASL 机制字符串,例如PLAIN,、SCRAM-SHA-256、SCRAM-SHA-512、 OAUTHBEARERAWS_MSK_IAM |
SASL 机制。 |
kafka.sasl.jaas.config |
None | JAAS 配置字符串 | JAAS 登录配置字符串。 |
kafka.sasl.login.callback.handler.class |
None | 完全限定的类名 | SASL 身份验证的登录回调处理程序的完全限定类名。 |
kafka.sasl.client.callback.handler.class |
None | 完全限定的类名 | 用于 SASL 身份验证的客户端回调处理程序的完全限定类名。 |
kafka.ssl.truststore.location |
None | 文件路径字符串 | SSL 信任存储文件的路径。 |
kafka.ssl.truststore.password |
None | 任意字符串 | SSL 信任存储文件的密码。 |
kafka.ssl.keystore.location |
None | 文件路径字符串 | SSL 密钥存储文件的路径。 |
kafka.ssl.keystore.password |
None | 任意字符串 | SSL 密钥存储文件的密码。 |
有关完整的身份验证设置说明,请参阅 “身份验证”。
Pub/Sub
使用这些选项订阅 spark.readStream.format("pubsub") Google Pub/Sub。 选项subscriptionIdtopicId和projectId必需选项。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
subscriptionId |
None | 任意字符串 | 必填。 Pub/Sub 订阅 ID。 连接器创建订阅(如果不存在)。 |
topicId |
None | 任意字符串 | 必填。 Pub/Sub 主题 ID。 |
projectId |
None | 任意字符串 | 必填。 Google Cloud 项目 ID。 |
numFetchPartitions |
流初始化时可用的执行程序数的一半 | 正整数 | 从订阅中提取行的并行 Spark 任务数。 |
maxBytesPerTrigger |
None | 正整数 | 每个微批处理要处理的字节数的软限制。 |
maxRecordsPerFetch |
1000 |
正整数 | 在处理之前,要提取每个任务的行数。 |
maxFetchPeriod |
10s |
持续时间字符串,例如 1s 或 1m |
每个任务在处理行之前进行获取的时间长度。 Azure Databricks建议使用默认值。 |
deleteSubscriptionOnStreamStop |
false |
true、false |
当 true流式处理查询结束时,订阅将从 subscriptionId中删除。 |
serviceCredential |
None | 任意字符串 | 用于向 Pub/Sub 进行身份验证的Azure Databricks服务凭据的名称。 在 Databricks Runtime 16.1 及更高版本中可用。 |
clientEmail |
None | 电子邮件地址字符串 | Google 服务帐户的电子邮件地址。 不使用服务凭据时是必需的。 |
clientId |
None | 任意字符串 | Google 服务帐户的客户端 ID。 不使用服务凭据时是必需的。 |
privateKey |
None | 私钥字符串 | Google 服务帐户的私钥。 不使用服务凭据时是必需的。 |
privateKeyId |
None | 任意字符串 | Google 服务帐户的私钥 ID。 不使用服务凭据时是必需的。 |
Pulsar
使用这些选项 spark.readStream.format("pulsar") 从 Apache Pulsar 流式传输。 在 Databricks Runtime 14.1 及更高版本中可用。
需要以下选项。 必须准确指定其中一个topic或 topicstopicsPattern。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
service.url |
None | Pulsar 服务 URL 字符串 | 例如,Pulsar 服务的 serviceURLPulsarpulsar://broker.example.com:6650。 |
topic |
None | 任意字符串 | 要使用的单一主题名称。 |
topics |
None | 以逗号分隔的主题名称列表 | 要使用的主题名称的逗号分隔列表。 |
topicsPattern |
None | Java正则表达式字符串 | Java正则表达式字符串,以匹配主题名称。 |
还支持以下选项:
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
admin.url |
None | URL 字符串 | Pulsar 管理服务 HTTP URL。 设置时 maxBytesPerTrigger 是必需的。 |
allowDifferentTopicSchemas |
false |
true、false |
如果读取了具有不同架构的多个主题,请使用此选项关闭基于架构的自动主题值反序列化。 仅当此项为 true 时,才会返回原始值。 |
failOnDataLoss |
true |
true、false |
数据丢失时是否失败查询。 例如,删除主题或消息因保留策略而过期时,可能会丢失数据。 |
maxBytesPerTrigger |
None | 正整数 | 每个微批处理要处理的字节数的软限制。 需要 admin.url。 |
pollTimeoutMs |
120000 |
正整数 | 从 Pulsar 读取消息的超时时间(以毫秒为单位)。 |
predefinedSubscription |
None | 任意字符串 | 连接器用于跟踪 Spark 应用程序进度的预定义订阅名称。 |
startingOffsets |
latest |
latest、 earliest或 JSON 偏移字符串 |
从何处开始阅读。 |
subscriptionPrefix |
None | 任意字符串 | 连接器用于生成随机订阅以跟踪 Spark 应用程序进度的前缀。 |
waitingForNonExistedTopic |
false |
true、false |
连接器是否等待,直到创建所需的主题。 |
可以使用以下选项模式指定其他 Pulsar 客户端、管理员和读取器配置:
| Pattern | 配置选项 |
|---|---|
pulsar.admin.* |
Pulsar 管理员配置 |
pulsar.client.* |
Pulsar 客户端配置,包括身份验证选项,例如 pulsar.client.authPluginClassName 和 pulsar.client.authParams。 |
pulsar.reader.* |
Pulsar 读取器配置 |
有关 Pulsar 客户端和管理员身份验证选项的详细信息,请参阅 身份验证。
身份验证
Azure Databricks 支持向 Pulsar 进行信任存储和密钥存储身份验证。 Azure Databricks建议使用机密来存储身份验证详细信息。 请参阅机密管理。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
pulsar.client.authPluginClassName |
None | 完全限定的类名 | 身份验证插件的完全限定类名。 例如,org.apache.pulsar.client.impl.auth.AuthenticationTls。 |
pulsar.client.authParams |
None | 凭据字符串 | 作为字符串传递给身份验证插件的身份验证凭据。 例如,tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem。 |
pulsar.client.useKeyStoreTls |
false |
true、false |
启用 true基于 KeyStore 的 TLS 配置而不是 PEM 格式化文件时。 |
pulsar.client.tlsTrustStoreType |
None | 任意字符串 | TLS 信任存储文件的格式。 例如,JKS。 |
pulsar.client.tlsTrustStorePath |
None | 文件路径字符串 | 包含受信任 CA 证书的 TLS 信任存储文件的路径。 当 pulsar.client.useKeyStoreTls 是 true 时为必需项。 |
pulsar.client.tlsTrustStorePassword |
None | 任意字符串 | TLS 信任存储文件的密码。 |
如果流使用 a PulsarAdmin,还可以设置以下选项:
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
pulsar.admin.authPluginClassName |
None | 完全限定的类名 | Pulsar 管理员客户端的身份验证插件的完全限定类名。 |
pulsar.admin.authParams |
None | 凭据字符串 | Pulsar 管理员客户端身份验证插件的身份验证凭据。 |
pulsar.admin.useTls |
None |
true、false |
是否对 Pulsar 管理员客户端连接使用 TLS。 |
pulsar.admin.tlsAllowInsecureConnection |
None |
true、false |
是否允许 Pulsar 管理客户端的不安全 TLS 连接。 |
pulsar.admin.tlsTrustCertsFilePath |
None | 文件路径字符串 | Pulsar 管理员客户端的受信任 TLS 证书文件的路径。 |
pulsar.admin.useKeyStoreTls |
None |
true、false |
是否对 Pulsar 管理客户端使用基于 KeyStore 的 TLS。 |
pulsar.admin.tlsTrustStoreType |
None | 任意字符串 | Pulsar 管理客户端的 TLS 信任存储的格式。 例如,JKS。 |
pulsar.admin.tlsTrustStorePath |
None | 文件路径字符串 | Pulsar 管理员客户端的 TLS 信任存储文件的路径。 当 pulsar.admin.useKeyStoreTls 是 true 时为必需项。 |
pulsar.admin.tlsTrustStorePassword |
None | 任意字符串 | Pulsar 管理员客户端 TLS 信任存储的密码。 |
有关身份验证示例,请参阅 “向 Pulsar 进行身份验证”。
DataFrameWriter 选项
将这些选项用于 DataFrameWriter.option() 和 DataFrameWriterV2.option()来控制Azure Databricks写入数据的方式。
Example
以下示例设置为mergeSchemaTrue写入 Delta Lake 表:
Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
Avro
编写 Avro 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
avroSchema |
None | JSON 架构字符串 | 完整的 Avro 架构作为 JSON 字符串。 使用此选项可将 Spark SQL 类型转换为特定的 Avro 类型。 适用于 读取和写入 Avro 文件。 |
avroSchemaUrl |
None | URL 字符串 | 指向 Avro 架构文件的 URL。 使用,而不是 avroSchema 在外部存储架构时使用。 与 avroSchema 互斥。 适用于 读取和写入 Avro 文件。 |
compression |
snappy |
uncompressed、deflate、snappy (default)、bzip2、xz、zstandard |
编写时要使用的压缩编解码器。 适用于 读取和写入 Avro 文件。 |
recordName |
topLevelRecord |
任意字符串 | 输出 Avro 架构中的顶级记录名称。 适用于 读取和写入 Avro 文件。 |
positionalFieldMatching |
false |
true、false |
是否按字段位置而不是按名称匹配 Spark 架构和 Avro 架构之间的列。 适用于 读取和写入 Avro 文件。 |
recordNamespace |
空字符串 | 任意字符串 | 输出 Avro 架构中顶级记录的命名空间。 适用于 读取和写入 Avro 文件。 |
Delta Lake 和 Apache Iceberg
编写 Delta Lake 和 Apache Iceberg 表时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
clusterByAuto |
false |
true、false |
是否启用自动液体聚类分析,其中Azure Databricks根据查询模式选择聚类分析列。 仅对 . 有效。mode("overwrite") 不能与模式一起使用 append 。 在 Databricks Runtime 16.4 及更高版本中可用。 适用于 对表使用液体聚类分析。 |
mergeSchema |
None |
true、false |
是否为写入操作启用架构演变。 源 DataFrame 中的新列将添加到目标表架构。 适用于批处理和流式处理追加。 适用于 更新表架构。 |
overwriteSchema |
None |
true、false |
是否在覆盖时替换表架构和分区。 需要 mode("overwrite") 没有 replaceWhere。 不能与 partitionOverwriteMode 一起使用。 适用于 更新表架构。 |
partitionOverwriteMode |
None |
static、dynamic |
分区覆盖模式。 将其设置为 dynamic 仅覆盖包含新数据的分区,使所有其他分区保持不变。 旧模式,在无服务器计算或 Databricks SQL 上不受支持。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
replaceOn |
None | 布尔表达式字符串 | 一个布尔表达式,与目标表中的行匹配,以替换为源查询中的行。 可以引用目标表和源查询中的列。 将删除并替换与源行匹配的目标中的行。 如果源为空,则不会发生删除操作。 用于 targetAlias 消除列引用的歧义。 在 Databricks Runtime 17.1 及更高版本中可用。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
replaceUsing |
None | 以逗号分隔的列名称列表 | 用于匹配目标表和源查询之间的行的列名的逗号分隔列表。 目标和源必须包含所有列出的列。 在相等比较下与源行匹配的目标中的行将被删除并替换。
NULL 值被视为不相等且不匹配。 在 Databricks Runtime 16.3 及更高版本中可用。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
replaceWhere |
None | 谓词表达式字符串 | 谓词表达式。 以原子方式仅覆盖与谓词匹配的记录。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
targetAlias |
None | 任意字符串 | 目标表的字符串别名。
replaceOn当条件同时引用目标表和源查询中的列时,请使用或replaceWhere消除列引用的歧义。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
txnAppId |
None | 任意字符串 | 用于标识应用程序中幂等写入 foreachBatch 操作的应用程序的唯一字符串。 与 一 txnVersion 起使用,确保对多个 Delta Lake 表进行恰好一次的写入。
foreachBatch适用于幂等表写入。 |
txnVersion |
None | 单调递增整数 | 单调增加的数字,用作操作中幂等写入的 foreachBatch 事务版本。 与 一 txnAppId 起使用,确保对多个 Delta Lake 表进行恰好一次的写入。
foreachBatch适用于幂等表写入。 |
optimizeWrite |
None |
true、false |
是否为此写入操作启用自动优化写入。 重写 spark.databricks.delta.optimizeWrite.enabled 配置。 适用于 Azure Databricks? 中的 Delta Lake 是什么。 |
userMetadata |
None | 任意字符串 | 追加到写入操作的提交元数据中的用户定义的字符串。 在 . 的 DESCRIBE HISTORY输出中可见 适用于 使用自定义元数据扩充表。 |
CSV
编写 CSV 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
charToEscapeQuoteEscaping |
\0 (未启用) |
单个字符 | 与引号字符不同时,用于转义转义字符的字符。 适用于 csv(DataFrameWriter)。 |
compression |
none |
none (default)、bzip2、gzip、lz4、snappy、deflate、zstd |
编写时要使用的压缩编解码器。 适用于 csv(DataFrameWriter)。 |
dateFormat |
yyyy-MM-dd |
日期格式字符串 | 日期列值的格式字符串。 适用于 csv(DataFrameWriter)。 |
emptyValue |
空字符串 | 任意字符串 | 为空(非 null)值编写的字符串。 适用于 csv(DataFrameWriter)。 |
encoding |
UTF-8 |
名称java.nio.charset.Charset |
输出文件的字符编码。 适用于 csv(DataFrameWriter)。 |
escape |
\ |
单个字符 | 用于转义带引号值的字符。 适用于 csv(DataFrameWriter)。 |
escapeQuotes |
true |
true、false |
是否转义带引号字段值内的引号字符。 适用于 csv(DataFrameWriter)。 |
header |
false |
true、false |
是否将列名称写入输出的第一行。 适用于 csv(DataFrameWriter)。 |
ignoreLeadingWhiteSpace |
false |
true、false |
是否在写入时从值中剪裁前导空格。 适用于 csv(DataFrameWriter)。 |
ignoreTrailingWhiteSpace |
false |
true、false |
是否在写入时从值中剪裁尾随空格。 适用于 csv(DataFrameWriter)。 |
lineSep |
\n |
一个字符串 | 记录之间使用的行分隔符字符串。 适用于 csv(DataFrameWriter)。 |
locale |
en-US |
标识符java.util.Locale |
一个 java.util.Locale 标识符。 标识Java区域设置,影响 CSV 中的默认日期、时间戳和十进制分析。 |
nullValue |
空字符串 | 任意字符串 | 为 null 值编写的字符串。 适用于 csv(DataFrameWriter)。 |
quote |
" |
单个字符 | 用于引用包含分隔符的字段值的字符。 适用于 csv(DataFrameWriter)。 |
quoteAll |
false |
true、false |
是否将所有字段值括在引号中,而不考虑内容。 适用于 csv(DataFrameWriter)。 |
sep |
, |
一个字符串 | 字段分隔符字符。 适用于 csv(DataFrameWriter)。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
时间戳格式字符串 | 时间戳列值的格式字符串。 适用于 csv(DataFrameWriter)。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
时间戳格式字符串 | 设置时间戳的字符串格式,不带时区 (TimestampNTZType) 列值。 |
Excel
编写Excel文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
dataAddress |
None | 工作表名称或单元格引用字符串 | 写入的工作表名称或起始单元格。 如果省略,则写入从单元格Sheet1开始命名A1的工作表。 接受工作表名称(SheetName)或单个单元格引用(SheetName!A1)。 写入不支持单元格区域。 |
dateFormatInWrite |
yyyy-mm-dd |
Excel日期格式字符串 | 应用于 Date 列的单元格格式字符串Excel。 使用Excel格式语法。 |
headerRows |
0 |
0、1 |
是否将列名写入第一行。 |
timestampNTZFormat |
yyyy-mm-dd hh:mm:ss |
Excel时间戳格式字符串 | 应用于 TimestampNTZ 和 Timestamp 列的单元格格式字符串Excel。 使用Excel格式语法。 |
version |
xlsx |
xlsx、xls |
要写入的Excel文件格式版本。 |
JSON
编写 JSON 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
compression |
none |
none、bzip2、gzip、lz4、snappy、deflate、zstd |
编写时要使用的压缩编解码器。 适用于 json (DataFrameWriter)。 |
dateFormat |
yyyy-MM-dd |
日期格式字符串 | 日期列值的格式字符串。 适用于 json (DataFrameWriter)。 |
encoding |
UTF-8 |
名称java.nio.charset.Charset |
输出文件的字符编码。 适用于 json (DataFrameWriter)。 |
ignoreNullFields |
值 spark.sql.jsonGenerator.ignoreNullFields |
true、false |
是否省略 JSON 输出中具有 null 值的字段。 适用于 json (DataFrameWriter)。 |
lineSep |
\n |
一个字符串 | 记录之间使用的行分隔符字符串。 适用于 json (DataFrameWriter)。 |
locale |
en-US |
标识符java.util.Locale |
影响 JSON 中默认日期、时间戳和十进制分析的Java区域设置标识符。 |
pretty |
false |
true、false |
是否启用漂亮的(缩进、多行)JSON 输出。 |
sortKeys |
false |
true、false |
是否按字母顺序对输出中的 JSON 对象的键进行排序。 可用于生成确定性输出。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
时间戳格式字符串 | 时间戳列值的格式字符串。 适用于 json (DataFrameWriter)。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
时间戳格式字符串 | 设置时间戳的字符串格式,不带时区 (TimestampNTZType) 列值。 |
writeNonAsciiCharacterAsCodePoint |
false |
true、false |
是否将非 ASCII 字符编码为 \uXXXX Unicode 转义序列,而不是输出中的文本 UTF-8 字符。 |
ORC
编写 ORC 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
compression |
zstd |
none、uncompressed、snappy、zlib、lzo、zstd、lz4、brotli |
编写时要使用的压缩编解码器。 适用于 orc(DataFrameWriter)。 |
Parquet
编写 Parquet 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
compression |
snappy |
none、uncompressed、snappy、gzip、lzo、brotli、lz4、lz4_raw、zstd |
编写时要使用的压缩编解码器。 适用于 parquet(DataFrameWriter)。 |
spark.sql.parquet.outputTimestampType |
INT96 |
INT96、TIMESTAMP_MICROS、TIMESTAMP_MILLIS |
用于对时间戳列进行编码的物理类型。 用于 INT96 与不支持标准时间戳类型的旧 Parquet 读取器兼容。 |
文本
编写文本文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
compression |
none |
none、bzip2、gzip、lz4、snappy、deflate、zstd |
编写时要使用的压缩编解码器。 适用于文本(DataFrameWriter)。 |
encoding |
UTF-8 |
名称java.nio.charset.Charset |
输出文件的字符编码。 |
lineSep |
\n |
一个字符串 | 记录之间使用的行分隔符字符串。 适用于文本(DataFrameWriter)。 |
XML
编写 XML 文件时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
arrayElementName |
item |
任意字符串 | 没有显式名称的数组元素的元素名称。 适用于 xml(DataFrameWriter)。 |
attributePrefix |
_ |
任意字符串 | 前面追加到与 XML 属性对应的字段名称的前缀。 适用于 xml(DataFrameWriter)。 |
compression |
none |
none、bzip2、gzip、lz4、snappy、deflate、zstd |
编写时要使用的压缩编解码器。 适用于 xml(DataFrameWriter)。 |
dateFormat |
yyyy-MM-dd |
日期格式字符串 | 日期列值的格式字符串。 适用于 xml(DataFrameWriter)。 |
declaration |
version="1.0" encoding="UTF-8" standalone="yes" |
要取消的 XML 声明字符串或空字符串 | 在每个输出文件的顶部写入的 XML 声明字符串。 设置为空字符串以禁止声明。 适用于 xml(DataFrameWriter)。 |
encoding |
UTF-8 |
名称java.nio.charset.Charset |
输出文件的字符编码。 适用于 xml(DataFrameWriter)。 |
indent |
4 个空格 | 任意字符串 | 用于缩进输出中的子元素的字符串。 设置为空字符串以关闭缩进,并在单个行上写入每行。 |
locale |
en-US |
标识符java.util.Locale |
影响 XML 中的默认日期、时间戳和小数格式的Java区域设置标识符。 |
nullValue |
null |
任意字符串 | 为 null 值编写的字符串。 如果设置为 nullnull 字段,则省略 null 字段的属性和子元素。 适用于 xml(DataFrameWriter)。 |
rootTag |
ROWS |
任意字符串 | 包装输出中所有行元素的根元素标记。 适用于 xml(DataFrameWriter)。 |
rowTag |
ROW |
任意字符串 | 表示输出中的行的元素标记。 适用于 xml(DataFrameWriter)。 |
singleVariantColumn |
None | 列名字符串 | 要写入 XML 文件的单个 Variant 列的名称。 适用于 xml(DataFrameWriter)。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
时间戳格式字符串 | 时间戳列值的格式字符串。 适用于 xml(DataFrameWriter)。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
时间戳格式字符串 | 设置时间戳的字符串格式,不带时区列值。 适用于 xml(DataFrameWriter)。 |
validateName |
true |
true、false |
如果列名不是有效的 XML 元素标识符,是否引发异常。 适用于 xml(DataFrameWriter)。 |
valueTag |
_VALUE |
任意字符串 | 用于 XML 元素中也具有属性或子元素的字符数据的字段名称。 适用于 xml(DataFrameWriter)。 |
DataStreamWriter 选项
使用这些选项 DataStreamWriter.option() 来配置流式写入。
Example
以下示例设置流的检查点位置:
Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
Scala
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
Common
以下选项适用于所有流式写入操作。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
checkpointLocation |
无(必需) | 路径字符串 | 流式处理查询的检查点目录的路径。 容错和恰好一次的处理保证是必需的。 每个流式处理查询都必须使用唯一的检查点位置。 Databricks 建议将检查点存储在 Unity 目录卷或云存储路径中。 请参阅结构化流式处理检查点。 |
path |
None | 路径字符串 | 基于文件的流式处理接收器(如 Parquet)的输出路径。 仅适用于基于文件的格式。 |
控制台接收器
将流写入控制台接收器时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
numRows |
20 |
正整数 | 写入控制台接收器时,要为每个微批处理显示的行数。 |
truncate |
true |
true、false |
是否在显示行时截断长字符串。 设置为 false 显示完整字符串值。 |
Delta Lake
使用 format("delta") 将流写入 Delta Lake 表时,以下选项适用。 仅覆盖选项,例如overwriteSchemareplaceWhere,不支持partitionOverwriteMode流式写入。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
mergeSchema |
false |
true、false |
流式处理数据帧包含新列时是否要改进 Delta Lake 表架构。 仅适用于追加输出模式。 适用于 更新表架构。 |
userMetadata |
None | 任意字符串 | 追加到写入操作的提交元数据中的用户定义的字符串。 在 . 的 DESCRIBE HISTORY输出中可见 适用于 使用自定义元数据扩充表。 |
文件存储端
将流写入基于文件的格式(Parquet、JSON、CSV、ORC、text)时,适用以下选项。 有关特定于格式的选项,请参阅 DataFrameWriter 选项。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
retention |
None | 时间字符串,例如 7 days 或 24 hours |
保留用于容错和压缩的接收器元数据文件的时长。 如果未设置,元数据文件将无限期保留。 |
Kafka 接收器
写入 Kafka 时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
kafka.bootstrap.servers |
None | 字符串的 host:port 逗号分隔列表 |
必填。 Kafka 中转站 host:port 地址的逗号分隔列表。 |
topic |
None | 任意字符串 | 所有行的目标 Kafka 主题。 如果 DataFrame 不包含列, topic 则为必需。 |
kafka.* |
None | 任何 Kafka 生成者配置 值 | 任何带有前缀的 kafka.。 例如,kafka.compression.type。 |
内存接收器
将流写入内存接收器时,以下选项适用。
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
queryName |
无(必需) | 任意字符串 | 查询写入的内存中表的名称。 内存接收器是必需的。 也可以通过 .queryName(). 进行配置。 |
mode |
exactlyonce |
exactlyonce、atleastonce |
内存接收器的传递保证。
exactlyonce 使用具有完全一次语义的微批处理模式。
atleastonce 使用具有至少一次语义的连续模式。 |
Spark 函数选项
某些 Spark SQL 内置函数接受控制 options 分析或序列化行为的映射。 将选项作为 Python dict 或 Scala Map[String, String] 传递。
Example
以下示例在删除格式不正确的记录时分析 JSON 列:
Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
Avro
Avro 函数接受与相应的数据帧选项相同的选项:
-
from_avro并使用schema_of_avroDataFrameReader Avro 选项。 -
to_avro使用 DataFrameWriter Avro 选项。
Example
以下示例解码启用了架构演变的 Avro 列:
Python
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
此外,架构注册表变体和from_avroto_avro接受以下选项:
| 密钥 | 默认 | 有效值 | 说明 |
|---|---|---|---|
schemaId |
None | 架构 ID 整数 | 从 Confluent 架构注册表中解码与架构不兼容 jsonFormatSchema的 Avro 数据时要使用的架构 ID。
from_avro仅适用于。 |
confluent.schema.registry.* |
None | 任何 Confluent SR 客户端属性值 | Confluent 架构注册表客户端配置属性。 使用此前缀传递任何 Confluent SR 客户端属性,例如 confluent.schema.registry.basic.auth.user.info 基本身份验证凭据。 架构注册表变体 from_avro 和 to_avro. |
CSV
CSV 函数接受与相应的数据帧选项相同的选项:
-
from_csv并使用schema_of_csvDataFrameReader CSV 选项。 -
to_csv使用 DataFrameWriter CSV 选项。
Example
以下示例使用自定义分隔符和 NULL 值读取 CSV:
Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
JSON
JSON 函数接受与相应的 DataFrame 选项相同的选项:
-
from_json并使用schema_of_jsonDataFrameReader JSON 选项。 -
to_json使用 DataFrameWriter JSON 选项。
Example
以下示例编写 JSON,其中 NULL 字段被忽略并启用了漂亮的格式设置:
Python
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf 不使用 to_protobuf 基于文件的 DataSource。 Protobuf 数据始终使用这些函数读取和写入为二进制列。 选项以区分 Map[String, String] 大小写的形式传递。
Example
以下示例使用 PERMISSIVE 模式解码 Protobuf 列:
Python
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Protobuf 函数使用以下选项:
XML
XML 函数接受与相应的 DataFrame 选项相同的选项:
-
from_xml并使用schema_of_xmlDataFrameReader XML 选项。 -
to_xml使用 DataFrameWriter XML 选项。
Example
以下示例使用自定义根标记和行标记编写 XML:
Python
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))