此页面列出了用于读取和写入数据的 Spark API 的可用输入和输出选项。
DataFrameReader 选项
将这些选项用于 DataFrameReader.option()、DataFrameReader.options()、read_files、COPY INTO 和 Auto Loader 来控制Azure Databricks读取数据文件的方式。
Example
以下示例设置为multiLineTrue读取 JSON 文件:
Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
Common
以下选项适用于所有文件格式。
| 密钥 | 默认 | 说明 |
|---|---|---|
ignoreCorruptFiles |
false |
是否忽略损坏的文件。 如果为 true,则当遇到损坏的文件时,Spark 作业将继续运行,并且仍会返回已读取的内容。 因此COPY INTO,可以观察已跳过的文件,numSkippedCorruptFiles如 operationMetrics Delta Lake 历史记录列中所示。 在 Databricks Runtime 11.3 LTS 及更高版本中可用。 |
ignoreMissingFiles |
false 用于自动加载程序( trueCOPY INTO 旧版) |
是否忽略缺少的文件。 如果为 true,则 Spark 作业在遇到缺少的文件且仍返回内容时继续运行。 在 Databricks Runtime 11.3 LTS 及更高版本中可用。 |
modifiedAfter |
None | 可选时间戳作为筛选器,用于仅引入在提供的时间戳之后具有修改时间戳的文件。 |
modifiedBefore |
None | 可选时间戳作为筛选器,用于仅引入在提供的时间戳之前具有修改时间戳的文件。 |
pathGlobFilter 或 fileNamePattern |
None | 提供用来选择文件的一种潜在 glob 模式。 等效于 PATTERN ( COPY INTO 旧版)。
fileNamePattern 可在 read_files 中使用。 |
recursiveFileLookup |
false |
当此选项搜索嵌套目录时 true,即使它们的名称不遵循分区命名方案(例如 date=2019-07-01)。 |
Avro
| 密钥 | 默认 | 说明 |
|---|---|---|
avroSchema |
None | 用户以 Avro 格式提供的可选架构。 读取 Avro 时,此选项可以设置为兼容但不同于实际 Avro 架构的不断发展架构。 反序列化架构与不断发展的架构一致。 例如,如果设置一个具有默认值的其他列的演变架构,则读取结果也包含新列。 |
avroSchemaEvolutionMode |
none |
如何使用架构注册表处理架构演变。 有效值: none (忽略架构更改并继续作业), restart (检测到架构更改时,引发 UnknownFieldException 并要求重启作业)。 |
datetimeRebaseMode |
LEGACY |
控制 DATE 和 TIMESTAMP 值在儒略历与外推格里历之间的基本值重定。 有效值:EXCEPTION、LEGACY 和 CORRECTED。 |
enableStableIdentifiersForUnionType |
false |
是否对 Avro Union 类型使用稳定的字段名称。 启用后,联合类型字段名称派生自其小写类型名称(例如, member_int) member_string。 如果两个类型名称在下限后相同,则引发异常。 |
mergeSchema |
false |
是否在多个文件中推断模式并合并每个文件的模式。 Avro 的 mergeSchema 不放宽数据类型。 |
mode |
FAILFAST |
用于处理损坏记录的分析器模式。 有效值: FAILFAST (引发异常)、 PERMISSIVE (将格式不正确的字段设置为 null)、 DROPMALFORMED (无提示地删除错误的记录)。 |
readerCaseSensitive |
true |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 |
recursiveFieldMaxDepth |
None | 递归 Avro 字段的最大递归深度。 设置为1截断所有递归字段,2以允许一级递归等。15 取消设置或 0不允许递归字段。 有效值: 0 到 15. |
rescuedDataColumn |
None | 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。有关更多详细信息,请参考什么是已恢复的数据列?。 |
stableIdentifierPrefixForUnionType |
member_ |
用于稳定联合类型字段名称的 enableStableIdentifiersForUnionType=true前缀。 |
CSV
| 密钥 | 默认 | 说明 |
|---|---|---|
badRecordsPath |
None | 用于记录错误 CSV 记录信息的文件存储路径。 |
charToEscapeQuoteEscaping |
\0 |
用来对引号的转义字符进行转义的字符。 例如,对于以下记录:[ " a\\", b ]:
|
columnNameOfCorruptRecord |
_corrupt_record |
支持自动加载程序。 不支持 COPY INTO(旧版)。用于存储因格式不正确而无法分析的记录的列。 如果用于分析的 mode 设置为 DROPMALFORMED,则此列将为空。 |
comment |
\0 |
定义表示行注释的字符(位于文本行的开头时)。 请使用 '\0' 来禁用注释跳过。 |
dateFormat |
yyyy-MM-dd |
用于分析日期字符串的格式。 |
emptyValue |
空字符串 | 空值的字符串表示形式。 |
enableDateTimeParsingFallback |
false |
当不能使用指定格式分析值时,是否回退到旧日期和时间时间戳分析行为。 当 false,分析失败时会引发错误或生成 null, mode具体取决于。 |
encoding 或 charset |
UTF-8 |
CSV 文件编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset。 当 UTF-16 为 UTF-32 时,不能使用 multiline 和 true。 |
enforceSchema |
true |
是否将指定的或推理出的架构强制应用于 CSV 文件。 如果启用此选项,则会忽略 CSV 文件的标题。 默认情况下,当使用自动加载程序来补救数据并允许架构演变时,会忽略此选项。 |
escape |
\ |
分析数据时要使用的转义字符。 |
extension |
csv |
预期的文件扩展名。 读取期间会筛选出没有此扩展名的文件。 |
failOnUnknownFields |
false |
CSV 记录是否包含架构中不存在的列时是否失败。 何时 false,无法识别的列将静默删除或救援, rescuedDataColumn具体取决于。 |
failOnWidenedFields |
false |
如果字段值在未扩大的情况下无法解析为声明的架构类型,则是否失败。 当 false,类型扩大的值会根据情况 rescuedDataColumn以无提示方式进行救援。 设置 failOnUnknownFields=true 可以屏蔽此选项的效果。 |
header |
false |
CSV 文件是否包含标题。 自动加载程序在推理架构时会假定文件具有标题。 |
ignoreLeadingWhiteSpace |
false |
是否忽略每个所分析值的前导空格。 |
ignoreTrailingWhiteSpace |
false |
是否忽略每个解析值的尾随空格。 |
inferSchema |
false |
是推断所解析的 CSV 记录的数据类型,还是假定所有列都是 StringType 类型的。 如果设置为 true,则需要对数据进行另一轮操作。 对于自动加载程序,请改用 cloudFiles.inferColumnTypes。 |
inputBufferSize |
1048576 (1 MB) |
CSV 分析程序缓冲区大小(以字节为单位)。 用于在分析大型 CSV 文件时优化内存使用情况。 有效值:正整数。 |
lineSep |
无,涵盖 \r、 \r\n和 \n |
两个连续 CSV 记录之间的字符串。 |
locale |
US |
一个 java.util.Locale 标识符。 影响 CSV 中的默认日期、时间戳和十进制分析。 |
maxCharsPerColumn |
-1 |
预期要解析的值的最大字符数。 可用于避免内存错误。 默认为 -1,表示无限制。 有效值:正整数或 -1 无限制。 |
maxColumns |
20480 |
记录可以包含的列数的硬限制。 有效值:正整数。 |
mergeSchema |
false |
是否在多个文件中推断模式并合并每个文件的模式。 已默认在推理架构时为自动加载程序启用。 |
mode |
PERMISSIVE |
围绕处理格式错误的记录提供的分析程序模式。 有效值:PERMISSIVE、、DROPMALFORMEDFAILFAST. |
multiLine |
false |
CSV 记录是否跨多行。 |
nanValue |
NaN |
分析 FloatType 和 DoubleType 列时非数字值的字符串表示形式。 |
negativeInf |
-Inf |
分析 FloatType 或 DoubleType 列时负无穷大的字符串表示形式。 |
nullValue |
空字符串 | null 值的字符串表示形式。 |
parserCaseSensitive(已弃用) |
false |
读取文件时,将标题中声明的列与架构对齐时是否区分大小写。 对于自动加载程序,此项默认为 true。 如果启用,则会在 rescuedDataColumn 中补救大小写不同的列。 出于对 readerCaseSensitive 的偏好,已不推荐使用此选项。 |
positiveInf |
Inf |
分析 FloatType 或 DoubleType 列时正无穷大的字符串表示形式。 |
preferDate |
true |
如果可能,尝试将字符串推断为日期而不是时间戳。 此外,还必须通过启用 inferSchema 或使用 cloudFiles.inferColumnTypes 自动加载程序来使用架构推理。 |
quote |
" |
当字段分隔符是值的一部分时用于对值进行转义的字符。 |
readerCaseSensitive |
true |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 |
rescuedDataColumn |
None | 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参考什么是已恢复的数据列?。COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。 |
sep 或 delimiter |
, |
列之间的分隔符字符串。 |
singleVariantColumn |
None | 当设置为列名时,将整个 CSV 记录读入具有该名称的单个 VariantType 列,而不是将每个字段分析为其自己的列。 需要 header=true。 |
skipRows |
0 |
CSV 文件开头应忽略的行数(包括注释行和空行)。 如果 header 为 true,则标头将是第一个未跳过和未注释的行。 有效值:正整数或 0。 |
timeFormat |
HH:mm:ss |
分析 TimeType 列值的格式。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
用于分析时间戳字符串的格式。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
不带时区 (TimestampNTZType) 字符串的时间戳分析格式。 |
timeZone |
None | 分析时间戳和日期时要使用的 java.time.ZoneId。 |
unescapedQuoteHandling |
STOP_AT_DELIMITER |
用于处理未转义的引号的策略。 允许的选项:
|
Excel
| 密钥 | 默认 | 说明 |
|---|---|---|
dataAddress |
None | 要以 Excel 语法读取的单元格区域。 如果省略,则从第一个工作表读取所有有效单元格。 用于 "SheetName!C5:H10" 从命名工作表读取范围、 "C5:H10" 从第一个工作表读取区域或 "SheetName" 从特定工作表读取所有数据。 |
headerRows |
0 |
用作列名标题的初始行数。 指定后 dataAddress ,这将在单元格范围内应用。 当0,列名称将自动生成为_c1,_c2_c3等等。有效值: 0, 。 1 |
operation |
readSheet |
对Excel工作簿执行的操作。 有效值: readSheet (从工作表读取数据), listSheets (返回包含字段 sheetIndex: long 和 sheetName: String 每个工作表的结构)。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
在Excel中存储为字符串的时间戳无时区值的自定义格式字符串。 自定义日期格式遵循 Datetime 模式中的格式。 |
dateFormat |
yyyy-MM-dd |
读取为字符串 Date值的自定义格式字符串。 自定义日期格式遵循 Datetime 模式中的格式。 |
JSON
| 密钥 | 默认 | 说明 |
|---|---|---|
allowBackslashEscapingAnyCharacter |
false |
是否允许反斜杠对其后面的任何字符进行转义。 如果未启用,则只能对按 JSON 规范显式列出的字符进行转义。 |
allowComments |
false |
是否允许在分析的内容中使用 Java、C 和 C++ 样式的注释('/'、'*' 和 '//' 变体)。 |
allowNonNumericNumbers |
true |
是否允许将非数字 (NaN) 标记集用作合法浮点数字值。 |
allowNumericLeadingZeros |
false |
是否允许整数以附加的(可忽略的)零开头(例如 000001)。 |
allowSingleQuotes |
true |
是否允许使用单引号(撇号字符 '\')来引用字符串(名称和字符串值)。 |
allowUnquotedControlChars |
false |
是否允许 JSON 字符串包含未转义的控制字符(值小于 32 的 ASCII 字符,包括制表符和换行符)。 |
allowUnquotedFieldNames |
false |
是否允许使用 JavaScript 允许的未引用字段名称,但不允许由 JSON 规范使用。 |
alternateVariantEncoding |
None | 用于源 JSON 中 Variant 值的编码。 设置为 Z85 解码 Base85 编码的 Variant 值,而不是存储为内联 JSON。 |
badRecordsPath |
None | 用于存储记录错误 JSON 信息的文件路径。 在基于文件的数据源中使用 badRecordsPath选项有以下限制:
|
columnNameOfCorruptRecord |
_corrupt_record |
用于存储因格式不正确而无法分析的记录的列。 如果用于分析的 mode 设置为 DROPMALFORMED,则此列将为空。 |
dateFormat |
yyyy-MM-dd |
用于分析日期字符串的格式。 |
dropFieldIfAllNull |
false |
在进行架构推理期间是否忽略所有 null 值或空数组和结构的列。 |
encoding 或 charset |
UTF-8 |
JSON 文件编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset。 当 UTF-16 为 UTF-32 时,不能使用 multiline 和 true。 |
inferTimestamp |
false |
是否尝试将时间戳字符串推理为 TimestampType。 设置为 true架构推理时,架构推理可能需要明显更长的时间。 必须启用 cloudFiles.inferColumnTypes 才能与自动加载程序一起使用。 |
lineSep |
无,涵盖 \r、 \r\n和 \n |
两个连续 JSON 记录之间的字符串。 |
locale |
US |
一个 java.util.Locale 标识符。 影响 JSON 中的默认日期、时间戳和十进制分析。 |
maxNestingDepth |
500 |
JSON 对象和数组允许的最大嵌套深度。 为深度嵌套文档增加此值。 有效值:正整数。 |
maxNumLen |
1000 |
JSON 输入中数字令牌的最大长度。 为包含大数值文本的 JSON 增加此值。 有效值:正整数。 |
maxStringLen |
不限制 | JSON 输入中字符串值的最大长度。 设置为使用大型字符串分析 JSON 时限制内存使用量。 有效值:正整数。 |
mode |
PERMISSIVE |
围绕处理格式错误的记录提供的分析程序模式。 有效值:PERMISSIVE、、DROPMALFORMEDFAILFAST. |
multiLine |
false |
JSON 记录是否跨多行。 |
prefersDecimal |
false |
如果可能,尝试将字符串推断为 DecimalType 而不是浮点型或双精度型。 此外,还必须通过启用 inferSchema 或使用 cloudFiles.inferColumnTypes 自动加载程序来使用架构推理。 |
primitivesAsString |
false |
是否将数字和布尔值等基元类型推理为 StringType。 |
readerCaseSensitive |
true |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 在 Databricks Runtime 13.3 及更高版本中可用。 |
rescuedDataColumn |
None | 是否将因数据类型不匹配或架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参阅什么是已恢复的数据列?COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。 |
singleVariantColumn |
None | 是否引入整个 JSON 文档,分析为具有指定字符串的单个 Variant 列作为列的名称。 如果未设置,JSON 字段将引入其自己的列。 有效值:任何字符串。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
用于分析时间戳字符串的格式。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
不带时区 (TimestampNTZType) 字符串的时间戳分析格式。 |
timeZone |
None | 分析时间戳和日期时要使用的 java.time.ZoneId。 |
upgradeExceptionAsBadRecord |
false |
是否将类型升级异常(例如,当值不能扩大为声明的列类型时)视为错误的记录,而不是引发异常。 |
ORC
| 密钥 | 默认 | 说明 |
|---|---|---|
mergeSchema |
false |
是否在多个文件中推断模式并合并每个文件的模式。 |
Parquet
| 密钥 | 默认 | 说明 |
|---|---|---|
datetimeRebaseMode |
LEGACY |
控制 DATE 和 TIMESTAMP 值在儒略历与外推格里历之间的基本值重定。 有效值:EXCEPTION、LEGACY 和 CORRECTED。 |
int96RebaseMode |
LEGACY |
控制 INT96 时间戳值在儒略历与外推格里历之间的基本值重定。 有效值:EXCEPTION、LEGACY 和 CORRECTED。 |
mergeSchema |
false |
是否在多个文件中推断模式并合并每个文件的模式。 |
readerCaseSensitive |
true |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 |
rescuedDataColumn |
None | 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参考什么是已恢复的数据列?。COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。 |
文本
| 密钥 | 默认 | 说明 |
|---|---|---|
encoding |
UTF-8 |
TEXT 文件行分隔符的编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset。 文件的内容不受此选项的影响,仍会读取as-is。 |
lineSep |
无,涵盖\r\r\n和\n |
两个连续 TEXT 记录之间的字符串。 |
wholeText |
false |
是否将文件读取为单个记录。 |
XML
| 密钥 | 默认 | 说明 |
|---|---|---|
rowTag |
None | 要作为行处理的 XML 文件的行标记。 在示例 XML <books> <book><book>...<books> 中,相应的值为 book。 这是必需选项。 |
samplingRatio |
1.0 |
定义用于架构推理的行的一部分。 XML 内置函数会忽略此选项。 有效值: 0.0 到 1.0. |
excludeAttribute |
false |
是否排除元素中的属性。 |
mode |
None | 在解析过程中处理损坏的记录的一种模式。
PERMISSIVE:对于损坏的记录,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式错误的字段设置为 null。 若要保留损坏的记录,可以在用户定义的架构中设置名为 string 的 columnNameOfCorruptRecord 类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 推理架构时,分析程序会在输出架构中隐式添加 columnNameOfCorruptRecord 字段。
DROPMALFORMED:忽略损坏的记录。 XML 内置函数不支持此模式。
FAILFAST:分析程序遇到损坏的记录时引发异常。 |
inferSchema |
true |
如果为 true,则尝试推断每个生成的 DataFrame 列的相应类型。 如果为 false,则生成的所有列均为 string 类型。 XML 内置函数会忽略此选项。 |
columnNameOfCorruptRecord |
spark.sql.columnNameOfCorruptRecord |
允许重命名包含模式 PERMISSIVE 创建的格式不正确的字符串的新字段。 |
attributePrefix |
None | 属性的前缀,用于区分属性和元素。 这将是字段名称的前缀。 默认值为 _。 读取 XML 时可以为空,但写入时不能为空。 也适用于 DataFrameWriter XML 选项。 |
valueTag |
_VALUE |
该标记用于同时具有属性元素或子元素的元素中的字符数据。 用户可以在架构中指定 valueTag 字段,或者当字符数据存在于具有其他元素或属性的元素中时,该字段将在架构推断期间自动添加。 也适用于 DataFrameWriter XML 选项。 |
encoding |
UTF-8 |
若要读取,请根据给定的编码类型解码 XML 文件。 对于写入,请指定已保存的 XML 文件的编码(字符集)。 XML 内置函数会忽略此选项。 也适用于 DataFrameWriter XML 选项。 |
ignoreSurroundingSpaces |
true |
是否必须跳过围绕值的空格。 将忽略只有空格的字符数据。 |
rowValidationXSDPath |
None | 可选 XSD 文件的路径,用于单独验证每行的 XML。 未能验证的行被视为分析错误。 XSD 不会影响架构,无论是提供还是推断。 |
ignoreNamespace |
false |
如果为 true,则忽略 XML 元素和属性上的命名空间前缀。 例如,标记 <abc:author> 和 <def:author> 会被视为只是 <author>。 不能忽略 rowTag 元素上的命名空间,只忽略其读取子元素。 XML 解析不识别命名空间,即使在有 false 的情况下。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
遵循 日期时间模式 格式的自定义时间戳格式字符串。 它适用于 timestamp 类型。 也适用于 DataFrameWriter XML 选项。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
遵循日期/时间模式格式的不带时区的自定义时间戳格式字符串。 这适用于 TimestampNTZType 类型。 也适用于 DataFrameWriter XML 选项。 |
dateFormat |
yyyy-MM-dd |
遵循 日期时间模式 格式的自定义日期格式字符串。 这适用于日期类型。 也适用于 DataFrameWriter XML 选项。 |
locale |
en-US |
将地区设置为 IETF BCP 47 格式的语言标签。 例如,在分析日期和时间戳时使用 locale。 |
nullValue |
字符串 null |
设置 null 值的字符串表示形式。 当为 null 时,分析程序不会为字段写入属性和元素。 也适用于 DataFrameWriter XML 选项。 |
readerCaseSensitive |
true |
指定启用 rescuedDataColumn 时区分大小写的行为。 如果为 true,请拯救名称不同于架构的数据列。 如果为 false,则以不区分大小写的方式读取数据。 |
rescuedDataColumn |
None | 是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法解析的所有数据收集到单独的列中。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参阅什么是被恢复的数据列?
COPY INTO(旧版)不支持已获救的数据列,因为无法使用 COPY INTO手动设置架构。 Databricks 建议对大多数引入方案使用自动加载程序。 |
singleVariantColumn |
none |
指定单个变体列的名称。 如果指定此选项用于读取,将整个 XML 记录解析为一个单个 Variant 列,并将该选项字符串值作为列名称。 如果为写入提供了此选项,则将单个 Variant 列的值写入 XML 文件。 也适用于 DataFrameWriter XML 选项。 |
useLegacyXMLParser |
true |
是否使用旧版 XML 分析程序。 旧版分析程序对格式不正确的内容进行了不太严格的验证,但内存效率较低。
false设置为选择使用更严格的默认分析程序。 |
wildcardColName |
xs_any |
用于捕获与通配符 (xs:any) 架构元素匹配的 XML 元素的列名。 不能与 rescuedDataColumn.一起使用。 |
DataStreamReader 选项
使用这些选项 DataStreamReader.option() 配置 Delta Lake 表和其他基于文件的源的流式读取。
有关文件格式选项(JSON、CSV、Parquet 等),请参阅 DataFrameReader 选项。
有关自动加载程序(cloudFiles.*)选项,请参阅 自动加载程序。
Example
以下示例设置为 maxFilesPerTrigger10 Delta Lake 表流:
Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
Common
以下选项适用于 Delta Lake 表和其他基于文件的流式处理源。
| 密钥 | 默认 | 说明 |
|---|---|---|
cleanSource |
off |
如何在流处理源文件后处理源文件。 有效值: off (无操作)、 delete (永久删除源文件)、 archive (移动到 sourceArchiveDir)。
archive设置为时,sourceArchiveDir还必须设置。 不适用于 Delta Lake 表流式处理。 |
fileNameOnly |
false |
是否仅按文件名而不是完整路径标识已处理的文件。 当 true,具有相同文件名的不同路径的文件将被视为同一文件,并且不会重新处理。 不适用于 Delta Lake 表流式处理。 |
latestFirst |
false |
是否在每个微批处理中首先处理最近修改的文件。 在希望尽快处理最新数据时非常有用。 当和maxFilesPerTrigger或maxBytesPerTrigger已设置时true,maxFileAge将忽略。 不适用于 Delta Lake 表流式处理。 |
maxBytesPerTrigger |
None | 每个微批处理的数据量的软最大值。 如果最小输入单位超过限制,批处理可能会处理超过限制。 一起使用 maxFilesPerTrigger时,微批处理会处理数据,直到达到任何一个限制。 有效值:正整数。对于自动加载程序,请改用 cloudFiles.maxBytesPerTrigger。 请参阅 “常见”。 |
maxCachedFiles |
10000 |
要缓存后续微批处理的未处理文件的最大数量。 设置为 0 关闭缓存。 当源目录包含每个触发器的许多新文件时增加此值。 不适用于 Delta Lake 表流式处理。 有效值:正整数或 0。 |
maxFileAge |
7d |
考虑处理的文件的最大期限,相对于最近修改的文件的时间戳,而不是当前系统时间。 忽略超过此阈值的文件。 接受持续时间字符串,例如 7d 或 4h。 在设置true和maxFilesPerTrigger设置maxBytesPerTrigger时latestFirst被忽略。 不适用于 Delta Lake 表流式处理。 |
maxFilesPerTrigger |
1000 用于 Delta Lake 和自动加载程序。 对于其他基于文件的源,没有最大值。 |
每个微批处理中处理的新文件数上限。 一起使用 maxBytesPerTrigger时,微批处理会处理数据,直到达到任何一个限制。 有效值:正整数。对于自动加载程序,请改用 cloudFiles.maxFilesPerTrigger。 请参阅 “常见”。 |
sourceArchiveDir |
None | 设置为 archive 时cleanSource存档目录的路径。 源文件在处理后移动到此路径,保留其相对目录结构。 不适用于 Delta Lake 表流式处理。 |
自动加载器
将这些选项与源配合使用 cloudFiles ,为从云存储进行流式引入配置 自动加载程序 。 特定于 cloudFiles 源的选项带有 cloudFiles 前缀,以将它们保留在与其他 结构化流 源选项不同的命名空间中。
Common
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.allowOverwrites |
false |
是否允许输入目录文件更改替代现有数据。 有关配置注意事项,请参阅自动加载程序在追加或覆盖文件时是否再次处理文件? |
cloudFiles.backfillInterval |
None | 自动加载程序可以按给定间隔触发异步回填。 例如 1 day ,每天回填或 1 week 每周回填。 有关详细信息,请参阅 使用 cloudFiles.backfillInterval 触发常规回填。请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 |
cloudFiles.cleanSource |
OFF |
是否自动从输入目录中删除已处理的文件。 设置为 OFF (默认值)时,不会删除任何文件。设置为 DELETE时,自动加载程序会在处理文件 30 天后自动删除文件。 为此,自动加载程序必须具有对源目录的写入权限。设置为 MOVE 后,自动加载程序将在文件被 cloudFiles.cleanSource.moveDestination 处理后 30 天内自动移动到指定位置。 为此,自动加载程序必须对源目录以及移动位置具有写入权限。当文件在表值函数的结果 commit_time中具有非 null 值cloud_files_state时,会将其视为已处理。 请参阅 cloud_files_state 表值函数。 可以使用 cloudFiles.cleanSource.retentionDuration配置处理后的 30 天额外等待时间。启用 cloudFiles.cleanSource之前,请查看以下注意事项:
在 Databricks Runtime 16.4 及更高版本中可用。 |
cloudFiles.cleanSource.retentionDuration |
30 days |
在已处理文件成为通过 cleanSource 进行存档的候选文件之前需等待的时间。 对于 DELETE 操作来说必须长于 7 天。 对于 MOVE 操作来说没有最低限制。该值为 CalendarInterval 字符串。 例如, "14 days"、"30 days"、"2 weeks" 或 "1 month"。在 Databricks Runtime 16.4 及更高版本中可用。 |
cloudFiles.cleanSource.moveDestination |
None | 当 cloudFiles.cleanSource 设置为 MOVE 时已处理文件的存档路径。 这可以是云存储路径或 Unity 目录卷 路径(例如 /Volumes/my_catalog/my_schema/my_volume/archive/)。移动位置必须:
自动加载程序必须对此目录具有写入权限。 在 Databricks Runtime 16.4 及更高版本中可用。 |
cloudFiles.format |
无(必需选项) | 源路径中的数据文件格式。 有效值包括: |
cloudFiles.includeExistingFiles |
true |
是包含流式处理输入路径中的现有文件,还是仅处理初始设置后到达的新文件。 仅在您首次启动流时会对该选项进行评估。 在重启流后更改此选项不起作用。 |
cloudFiles.inferColumnTypes |
false |
在利用架构推理时是否推断确切的列类型。 默认情况下,在推断 JSON 和 CSV 数据集时,列将被推断为字符串。 有关更多详细信息,请参阅架构推理。 |
cloudFiles.maxBytesPerTrigger |
None | 每次触发时要处理的最大新字节数。 你可以指定一个字节字符串(例如 10g),将每个微批限制为 10 GB 数据。 这是一个软性最大值。 如果每个文件为 3 GB,则 Azure Databricks 在一个微批中可以处理 12 GB。 与 cloudFiles.maxFilesPerTrigger 一起使用时,Azure Databricks 会消耗到 cloudFiles.maxFilesPerTrigger 或 cloudFiles.maxBytesPerTrigger 中的最低限制(以先到达者为准)。 与 Trigger.Once()(Trigger.Once() 已弃用)一起使用时,此选项不起作用。在 Databricks Runtime 18.0 及更高版本中,此选项是动态配置的,不需要手动设置。 |
cloudFiles.maxFileAge |
None | 出于重复数据删除目的而跟踪文件事件的时长。 Databricks 建议不要优化此参数,除非你是在以每小时数百万个文件的速度引入数据。 有关更多详细信息,请参阅 文件事件跟踪 部分。 过于激进地调整 cloudFiles.maxFileAge 可能会导致数据质量问题,例如重复引入或缺少文件。 因此,Databricks 建议为 cloudFiles.maxFileAge 使用保守设置,例如 90 天,这与类似数据引入解决方案建议的值相当。 |
cloudFiles.maxFilesPerTrigger |
1000 |
要在每个触发器中处理的最大新文件数。 与 cloudFiles.maxBytesPerTrigger 一起使用时,Azure Databricks 会消耗到 cloudFiles.maxFilesPerTrigger 或 cloudFiles.maxBytesPerTrigger 中的最低限制(以先到达者为准)。 与 Trigger.Once()(已弃用)一起使用时,此选项不起作用。在 Databricks Runtime 18.0 及更高版本中,此选项是动态配置的,不需要手动设置。 |
cloudFiles.partitionColumns |
None | 要从文件的目录结构推断的 Hive 样式分区列的逗号分隔列表。 Hive 样式的分区列是由等号(例如 <base-path>/a=x/b=1/c=y/file.format)组合的键值对。 在此示例中,分区列为 a、b 和 c。 默认情况下,如果使用架构推理并提供 <base-path> 从中加载数据,这些列会自动添加到架构中。 如果提供架构,则自动加载程序会期望这些列包含在架构中。 如果你不希望这些列成为架构的一部分,则可以指定 "" 以忽略这些列。 此外,当你希望在复杂目录结构中通过文件路径推断列时,可以使用此选项,如下面的示例所示:<base-path>/year=2022/week=1/file1.csv<base-path>/year=2022/month=2/day=3/file2.csv<base-path>/year=2022/month=2/day=4/file3.csv将 cloudFiles.partitionColumns 指定为 year,month,day 会针对 year=2022 返回 file1.csv,但 month 和 day 列为 null。month 和 day 已正确解析为 file2.csv 和 file3.csv。 |
cloudFiles.schemaEvolutionMode |
addNewColumns 如果未提供架构, none 则为 |
在数据中发现新列时对架构进行演变的模式。 默认情况下,在推断 JSON 数据集时,将列推断为字符串。 有关更多详细信息,请参阅架构演变。 |
cloudFiles.schemaHints |
None | 在架构推理期间向自动加载程序提供的架构信息。 有关更多详细信息,请参阅架构提示。 |
cloudFiles.schemaLocation |
无(推断架构所需的) | 存储推断出的架构和后续更改的位置。 有关更多详细信息,请参阅架构推理。 |
cloudFiles.useStrictGlobber |
false |
是否使用与 Apache Spark 中其他文件源的默认通配行为相匹配的严格通配符。 有关更多详细信息,请参阅常见数据加载模式。 在 Databricks Runtime 12.2 LTS 及更高版本中可用。 |
cloudFiles.validateOptions |
true |
是否对自动加载器选项进行验证,并对未知或不一致的选项返回错误。 |
目录列表
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.useIncrementalListing(已弃用) |
auto 在 Databricks Runtime 17.2 及更低版本中, false 在 Databricks Runtime 17.3 及更高版本上 |
此功能已弃用。 Databricks 建议将 文件通知模式用于文件事件 而不是使用 cloudFiles.useIncrementalListing。是否在目录列表模式下使用增量列表而不是完整列表。 默认情况下,自动加载程序尽最大努力自动检测给定目录是否适用于增量列表。 可以显式使用增量列表,或者通过将完整目录列表分别设置为 true 或 false 来使用该列表。错误地在非按词汇排序的目录上启用增量列表会阻止自动加载程序发现新文件。 适用于 Azure Data Lake Storage( abfss://)、S3(s3://)和 GCS(gs://)。在 Databricks Runtime 9.1 LTS 及更高版本中可用。 可用值: auto、true、false |
文件通知
有关配置文件通知模式(包括所需的云权限、设置说明和身份验证方法)的信息,请参阅 在文件通知模式下配置自动加载程序流。
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.fetchParallelism |
1 |
从队列服务中提取消息时要使用的线程数。 请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 |
cloudFiles.pathRewrites |
None | 仅当指定接收 queueUrl 来自多个 S3 存储桶的文件通知并且想要使用配置为访问这些容器中的数据的装入点时,才是必需的。 借助此选项可以使用装入点重写 bucket/key 路径的前缀。 只能重写前缀。 例如,对于配置 {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"},路径 s3://<databricks-mounted-bucket>/path/2017/08/fileA.json 将重写为 dbfs:/mnt/data-warehouse/2017/08/fileA.json。请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 |
cloudFiles.resourceTag |
None | 一系列键值标记对,可帮助关联和识别相关资源,例如:cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")有关 AWS 的更多信息,请参阅 Amazon SQS 成本分配标记和为 Amazon SNS 主题配置标记。 (1) 有关 Azure 的详细信息,请参阅命名队列和元数据以及 properties.labels中的覆盖范围。 自动加载程序将这些键值标记对以 JSON 格式存储为标签。
(1)有关 GCP 的更多信息,请参阅使用标签报告使用情况。 (1) 请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 而是使用云提供程序控制台设置资源标记。 |
cloudFiles.useManagedFileEvents |
false |
设置为 true时,Auto Loader 使用文件事件服务来检测外部位置中的文件。 仅当加载路径位于启用了文件事件的外部位置时,才能使用此选项。 请参阅 将文件通知模式与文件事件配合使用。文件事件在文件发现中提供通知级性能,因为自动加载程序可以在上次运行后发现新文件。 与目录列表不同,此过程不需要列出目录中的所有文件。 在某些情况下,即使启用了文件事件选项,自动加载程序也使用目录列表:
在 Databricks Runtime 14.3 LTS 及更高版本中可用。 |
cloudFiles.listOnStart |
false |
设置为 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN中恢复。 请参阅如何从CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN错误中恢复? |
cloudFiles.useNotifications |
false |
是否使用文件通知模式来确定何时存在新文件。 如果为 false,则使用目录列表模式。 请参阅比较自动加载器文件检测模式。请勿在 cloudFiles.useManagedFileEvents 被设置为 true 时使用。 |
(1) 默认情况下,自动加载程序会尽力添加以下键值标记对:
-
vendor:Databricks -
path:从中加载数据的位置。 由于标签限制,在 GCP 中不可用。 -
checkpointLocation:流检查点的位置。 由于标签限制,在 GCP 中不可用。 -
streamId:流的全局唯一标识符。
Databricks 保留这些密钥名称,并且无法覆盖其值。
特定于云
自动加载程序提供用于为文件通知模式配置云基础结构的选项。 有关所需的云权限和设置说明,请参阅 在文件通知模式下配置自动加载程序流。
AWS
仅当选择 cloudFiles.useNotifications = true 并且希望自动加载程序为你设置通知服务时,才提供以下选项:
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.region |
EC2 实例的区域 | 源 S3 存储桶所在的区域以及要在其中创建 AWS SNS 和 SQS 服务的区域。 |
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.restrictNotificationSetupToSameAWSAccountId |
false |
仅允许来自与 SNS 主题位于同一账户的 AWS S3 存储桶的事件通知。 如果为 true,自动加载程序仅接受与 SNS 主题相同的帐户中的 AWS S3 存储桶的事件通知。 当 false,访问策略不会限制跨帐户存储桶和 SNS 主题设置。 当 SNS 主题和存储桶路径与不同的帐户关联时,这非常有用。在 Databricks Runtime 17.2 及更高版本中可用。 |
仅当你选择 cloudFiles.useNotifications = true 并希望自动加载程序使用已设置的队列时,才提供以下选项:
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.queueUrl |
None | SQS 队列的 URL。 如果提供了此项,则自动加载程序会直接使用此队列中的事件,而不是设置自己的 AWS SNS 和 SQS 服务。 |
AWS 身份验证选项
提供以下身份验证选项以使用 Databricks 服务凭据:
| 密钥 | 默认 | 说明 |
|---|---|---|
databricks.serviceCredential |
None | 您的 Databricks 服务凭据的名称。 在 Databricks Runtime 16.1 及更高版本中可用。 |
当 Databricks 服务凭据或 IAM 角色不可用时,可以改为提供以下身份验证选项:
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.awsAccessKey |
None | 用户的 AWS 访问密钥 ID。 必须与 cloudFiles.awsSecretKey 一起提供。 |
cloudFiles.awsSecretKey |
None | 用户的 AWS 机密访问密钥。 必须与 cloudFiles.awsAccessKey 一起提供。 |
cloudFiles.roleArn |
None | 要担任的 IAM 角色的 ARN(如果需要)。 可以通过提供凭据来承担角色,该凭据可以从群集的实例配置文件或cloudFiles.awsAccessKeycloudFiles.awsSecretKey中获取。 |
cloudFiles.roleExternalId |
None | 使用 cloudFiles.roleArn 担任角色时提供的标识符。 |
cloudFiles.roleSessionName |
None | 使用 cloudFiles.roleArn 担任角色时使用的可选会话名称。 |
cloudFiles.stsEndpoint |
None | 一个可选终结点,用于在使用 cloudFiles.roleArn 担任角色时访问 AWS STS。 |
蔚蓝
如果指定 cloudFiles.useNotifications = true,并且希望自动加载程序设置通知服务,则必须为以下所有选项提供值:
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.resourceGroup |
None | 在其中创建存储帐户的Azure资源组。 |
cloudFiles.subscriptionId |
None | 在其中创建资源组的Azure订阅 ID。 |
databricks.serviceCredential |
None | 您的 Databricks 服务凭据的名称。 在 Databricks Runtime 16.1 及更高版本中可用。 |
如果 Databricks 服务凭据不可用,可以改为提供以下身份验证选项:
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.clientId |
None | 服务主体的客户端 ID 或应用程序 ID。 |
cloudFiles.clientSecret |
None | 服务主体的客户端密钥。 |
cloudFiles.connectionString |
None | 存储帐户的连接字符串,基于帐户访问密钥或共享访问签名 (SAS)。 |
cloudFiles.tenantId |
None | 在其中创建服务主体的Azure租户 ID。 |
仅当设置 cloudFiles.useNotifications = true 并且希望自动加载程序使用现有队列时,才提供以下选项:
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.queueName |
None | Azure 队列的名称。 如果提供了此项,则云文件源会直接使用此队列中的事件,而不是设置自己的 Azure 事件网格和队列存储服务。 在这种情况下,databricks.serviceCredential 或 cloudFiles.connectionString 只需要对队列具有读取权限。 |
GCP
自动加载程序可以利用 Databricks 服务凭据自动为你设置通知服务。 使用 Databricks 服务凭据创建的服务帐户将需要 在文件通知模式下配置自动加载程序流中指定的权限。
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.projectId |
None | GCS 存储桶位于的项目 ID。 Google Cloud Pub/Sub 订阅也在此项目中创建。 |
databricks.serviceCredential |
None | 您的 Databricks 服务凭据的名称。 在 Databricks Runtime 16.1 及更高版本中可用。 |
如果 Databricks 服务凭据不可用,可以直接使用 Google 服务帐户。 您可以通过按照 Google 服务设置 的步骤将群集配置为使用服务帐户,或者直接提供以下身份验证选项:
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.client |
None | Google 服务帐户的客户端 ID。 |
cloudFiles.clientEmail |
None | Google 服务帐户的电子邮件地址。 |
cloudFiles.privateKey |
None | 为 Google 服务帐户生成的私钥。 |
cloudFiles.privateKeyId |
None | 为 Google 服务帐户生成的私钥的 ID。 |
仅当你选择 cloudFiles.useNotifications = true 并希望自动加载程序使用已设置的队列时,才提供以下选项:
| 密钥 | 默认 | 说明 |
|---|---|---|
cloudFiles.subscription |
None | Google Cloud Pub/Sub 订阅的名称。 如果提供此选项,则云文件源将使用此队列中的事件,而不是设置自身的 GCS 通知和 Google Cloud Pub/Sub 服务。 |
Delta Lake
使用以下 spark.readStream选项从 Delta Lake 表读取时适用。
| 密钥 | 默认 | 说明 |
|---|---|---|
allowSourceColumnDrop |
None | 设置为 Delta 表版本号或 "always" 允许流在从源表架构中删除列后继续。 当设置为版本号时,确认所有架构更改都达到该版本。 需要 schemaTrackingLocation。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明。 |
allowSourceColumnRename |
None | 设置为 Delta 表版本号或 "always" 允许流在源表中重命名列后继续。 当设置为版本号时,确认所有架构更改都达到该版本。 需要 schemaTrackingLocation。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明。 |
allowSourceColumnTypeChange |
None | 设置为 Delta 表版本号或 "always" 允许流在源表中更改列类型后继续。 当设置为版本号时,确认所有架构更改都达到该版本。 需要 schemaTrackingLocation。 请参阅类型扩展。 |
excludeRegex |
None | 正则表达式模式。 与模式匹配的文件从流读取中排除。 用于筛选不符合预期命名约定的文件。 |
failOnDataLoss |
true |
如果源数据由于日志保留()而删除了流式处理查询,logRetentionDuration是否失败。 设置为 false 跳过缺失的数据并继续处理。 请参阅为“按时间顺序查看”查询配置数据保留。 |
ignoreChanges(已弃用) |
false |
在 Databricks Runtime 11.3 LTS 和更低版本中可用。 在修改操作(如UPDATE、MERGE INTODELETE或OVERWRITE)后重新发出重写数据文件。 可以与新行一起发出未更改的行,因此下游使用者必须处理重复项。 删除操作不会传播到下游。 替换为 skipChangeCommits Databricks Runtime 12.2 LTS 及更高版本。 |
ignoreDeletes(已弃用) |
false |
忽略在分区边界处删除数据的事务(仅删除完整分区)。 不处理非分区删除、更新或其他修改。 请改用 skipChangeCommits。 |
readChangeFeed 或 readChangeData |
false |
是否启用读取流式处理查询的更改数据馈送。 启用后,流会发出包含其他元数据列的行级更改(插入、更新和删除)。 请参阅 在 Azure Databricks 上使用 Delta Lake 更改数据流。 |
schemaTrackingLocation |
None | Delta Lake 跟踪流读取的架构更改的目录的路径。 当从启用了列映射的表进行流式处理以及使用 allowSourceColumn* 选项处理架构演变时是必需的。 必须位于 checkpointLocation 流式处理查询中。 请参阅 有关使用 Delta Lake 列映射重命名和删除列的说明。 |
skipChangeCommits |
false |
忽略删除或修改现有记录和进程仅追加的事务。 Databricks 建议对不使用更改数据馈送的大多数工作负荷使用此选项。 在 Databricks Runtime 12.2 LTS 及更高版本中可用。 请参阅 使用 跳过上游更改提交 skipChangeCommits。 |
startingTimestamp |
最新可用 | 要从中读取的时间戳。 该流读取指定时间戳或之后提交的所有表更改。 如果时间戳位于所有可用表提交之前,则流从最早的可用提交开始。 不能与 startingVersion.一起使用。 如果流式处理检查点已存在,则忽略。接受时间戳字符串,例如 "2019-01-01T00:00:00.000Z" 或日期字符串,例如 "2019-01-01"。 |
startingVersion |
最新可用 | 要从中读取的增量表版本。 该流读取指定版本或之后提交的所有更改。 指定 "latest" 仅从最近的更改开始。 不能与 startingTimestamp.一起使用。 如果流式处理检查点已存在,则忽略。 参见 使用表历史记录。 |
withEventTimeOrder |
false |
将初始表快照划分为事件时间桶,以防止记录被错误地标记为后期事件,并在带有水印的有状态查询中丢弃。 在开始初始快照处理后无法更改,而无需删除检查点。 在 Databricks Runtime 11.3 LTS 及更高版本中可用。 请参阅 “处理初始快照而不删除数据”。 |
DataFrameWriter 选项
将这些选项用于 DataFrameWriter.option() 和 DataFrameWriterV2.option()来控制Azure Databricks写入数据的方式。
Example
以下示例设置为mergeSchemaTrue写入 Delta Lake 表:
Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
Avro
| 密钥 | 默认 | 说明 |
|---|---|---|
avroSchema |
None | 完整的 Avro 架构作为 JSON 字符串。 使用此选项可将 Spark SQL 类型转换为特定的 Avro 类型。 适用于 Avro 文件。 |
avroSchemaUrl |
None | 指向 Avro 架构文件的 URL。 使用,而不是 avroSchema 在外部存储架构时使用。 与 avroSchema 互斥。 适用于 Avro 文件。 |
compression |
snappy |
编写时要使用的压缩编解码器。 有效值:uncompressed、、deflate、snappybzip2、xz。 zstandard 适用于 Avro 文件。 |
recordName |
topLevelRecord |
输出 Avro 架构中的顶级记录名称。 适用于 Avro 文件。 |
positionalFieldMatching |
false |
是否按字段位置而不是按名称匹配 Spark 架构和 Avro 架构之间的列。 适用于 Avro 文件。 |
recordNamespace |
空字符串 | 输出 Avro 架构中顶级记录的命名空间。 适用于 Avro 文件。 |
Delta Lake 和 Apache Iceberg
| 密钥 | 默认 | 说明 |
|---|---|---|
clusterByAuto |
false |
是否启用自动液体聚类分析,其中Azure Databricks根据查询模式选择聚类分析列。 仅对 . 有效。mode("overwrite") 不能与模式一起使用 append 。 在 Databricks Runtime 16.4 及更高版本中可用。 适用于 对表使用液体聚类分析。 |
mergeSchema |
None | 是否为写入操作启用架构演变。 源 DataFrame 中的新列将添加到目标表架构。 适用于批处理和流式处理追加。 适用于 更新表架构。 |
overwriteSchema |
None | 是否在覆盖时替换表架构和分区。 需要 mode("overwrite") 没有 replaceWhere。 不能与 partitionOverwriteMode 一起使用。 适用于 更新表架构。 |
partitionOverwriteMode |
None | 分区覆盖模式。 将其设置为 dynamic 仅覆盖包含新数据的分区,使所有其他分区保持不变。 旧模式,在无服务器计算或 Databricks SQL 上不受支持。 有效值: static、 dynamic。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
replaceOn |
None | 一个布尔表达式,与目标表中的行匹配,以替换为源查询中的行。 可以引用目标表和源查询中的列。 将删除并替换与源行匹配的目标中的行。 如果源为空,则不会发生删除操作。 用于 targetAlias 消除列引用的歧义。 在 Databricks Runtime 17.1 及更高版本中可用。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
replaceUsing |
None | 用于匹配目标表和源查询之间的行的列名的逗号分隔列表。 目标和源必须包含所有列出的列。 在相等比较下与源行匹配的目标中的行将被删除并替换。
NULL 值被视为不相等且不匹配。 在 Databricks Runtime 16.3 及更高版本中可用。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
replaceWhere |
None | 谓词表达式。 以原子方式仅覆盖与谓词匹配的记录。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
targetAlias |
None | 目标表的字符串别名。
replaceOn当条件同时引用目标表和源查询中的列时,请使用或replaceWhere消除列引用的歧义。 适用于 选择性地使用 Delta Lake 覆盖数据。 |
txnAppId |
None | 用于标识应用程序中幂等写入 foreachBatch 操作的应用程序的唯一字符串。 与 一 txnVersion 起使用,确保对多个 Delta Lake 表进行恰好一次的写入。
foreachBatch适用于幂等表写入。 |
txnVersion |
None | 单调增加的数字,用作操作中幂等写入的 foreachBatch 事务版本。 与 一 txnAppId 起使用,确保对多个 Delta Lake 表进行恰好一次的写入。
foreachBatch适用于幂等表写入。 |
optimizeWrite |
None | 是否为此写入操作启用自动优化写入。 重写 spark.databricks.delta.optimizeWrite.enabled 配置。 适用于 Azure Databricks? 中的 Delta Lake 是什么。 |
userMetadata |
None | 追加到写入操作的提交元数据中的用户定义的字符串。 在 . 的 DESCRIBE HISTORY输出中可见 适用于 使用自定义元数据扩充表。 |
CSV
| 密钥 | 默认 | 说明 |
|---|---|---|
charToEscapeQuoteEscaping |
\0 (未启用) |
与引号字符不同时,用于转义转义字符的字符。 适用于 csv(DataFrameWriter)。 |
compression |
none |
编写时要使用的压缩编解码器。 有效值:none、、bzip2、gziplz4、snappy、deflate。 zstd 适用于 csv(DataFrameWriter)。 |
dateFormat |
yyyy-MM-dd |
日期列值的格式字符串。 适用于 csv(DataFrameWriter)。 |
emptyValue |
空字符串 | 为空(非 null)值编写的字符串。 适用于 csv(DataFrameWriter)。 |
encoding |
UTF-8 |
输出文件的字符编码。 适用于 csv(DataFrameWriter)。 |
escape |
\ |
用于转义带引号值的字符。 适用于 csv(DataFrameWriter)。 |
escapeQuotes |
true |
是否转义带引号字段值内的引号字符。 适用于 csv(DataFrameWriter)。 |
header |
false |
是否将列名称写入输出的第一行。 适用于 csv(DataFrameWriter)。 |
ignoreLeadingWhiteSpace |
false |
是否在写入时从值中剪裁前导空格。 适用于 csv(DataFrameWriter)。 |
ignoreTrailingWhiteSpace |
false |
是否在写入时从值中剪裁尾随空格。 适用于 csv(DataFrameWriter)。 |
lineSep |
\n |
记录之间使用的行分隔符字符串。 适用于 csv(DataFrameWriter)。 |
locale |
en-US |
一个 java.util.Locale 标识符。 影响写入时的日期和时间时间戳值的格式设置。 |
nullValue |
空字符串 | 为 null 值编写的字符串。 适用于 csv(DataFrameWriter)。 |
quote |
" |
用于引用包含分隔符的字段值的字符。 适用于 csv(DataFrameWriter)。 |
quoteAll |
false |
是否将所有字段值括在引号中,而不考虑内容。 适用于 csv(DataFrameWriter)。 |
sep |
, |
字段分隔符字符。 适用于 csv(DataFrameWriter)。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
时间戳列值的格式字符串。 适用于 csv(DataFrameWriter)。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
设置时间戳的字符串格式,不带时区 (TimestampNTZType) 列值。 |
Excel
| 密钥 | 默认 | 说明 |
|---|---|---|
dataAddress |
None | 写入的工作表名称或起始单元格。 如果省略,则写入从单元格A1开始命名Sheet1的工作表。 接受工作表名称("SheetName")或单个单元格引用("SheetName!A1")。 写入不支持单元格区域。 |
dateFormatInWrite |
yyyy-mm-dd |
应用于 Date 列的单元格格式字符串Excel。 使用Excel格式语法。 |
headerRows |
0 |
是否将列名写入第一行。 有效值: 0、 1。 |
timestampNTZFormat |
yyyy-mm-dd hh:mm:ss |
应用于 TimestampNTZ 和 Timestamp 列的单元格格式字符串Excel。 使用Excel格式语法。 |
version |
xlsx |
要写入的Excel文件格式版本。 有效值: xlsx、 xls。 |
JSON
| 密钥 | 默认 | 说明 |
|---|---|---|
compression |
none |
编写时要使用的压缩编解码器。 有效值:none、、bzip2、gziplz4、snappy、deflate。 zstd 适用于 json (DataFrameWriter)。 |
dateFormat |
yyyy-MM-dd |
日期列值的格式字符串。 适用于 json (DataFrameWriter)。 |
encoding |
UTF-8 |
输出文件的字符编码。 适用于 json (DataFrameWriter)。 |
ignoreNullFields |
值 spark.sql.jsonGenerator.ignoreNullFields |
是否省略 JSON 输出中具有 null 值的字段。 适用于 json (DataFrameWriter)。 |
lineSep |
\n |
记录之间使用的行分隔符字符串。 适用于 json (DataFrameWriter)。 |
locale |
en-US |
一个 java.util.Locale 标识符。 影响写入时的日期和时间时间戳值的格式设置。 |
pretty |
false |
是否启用漂亮的(缩进、多行)JSON 输出。 |
sortKeys |
false |
是否按字母顺序对输出中的 JSON 对象的键进行排序。 可用于生成确定性输出。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
时间戳列值的格式字符串。 适用于 json (DataFrameWriter)。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
设置时间戳的字符串格式,不带时区 (TimestampNTZType) 列值。 |
writeNonAsciiCharacterAsCodePoint |
false |
是否将非 ASCII 字符编码为 \uXXXX Unicode 转义序列,而不是输出中的文本 UTF-8 字符。 |
ORC
| 密钥 | 默认 | 说明 |
|---|---|---|
compression |
zstd |
编写时要使用的压缩编解码器。 有效值:none、、uncompressed、snappy、zliblzo、zstd、 lz4。 brotli 适用于 orc(DataFrameWriter)。 |
Parquet
| 密钥 | 默认 | 说明 |
|---|---|---|
compression |
snappy |
编写时要使用的压缩编解码器。 有效值:none、、uncompressed、snappy、lzogzip、brotlilz4、lz4_raw。 zstd 适用于 parquet(DataFrameWriter)。 |
spark.sql.parquet.outputTimestampType |
INT96 |
用于对时间戳列进行编码的物理类型。 有效值:INT96、、TIMESTAMP_MICROSTIMESTAMP_MILLIS. 用于 INT96 与不支持标准时间戳类型的旧 Parquet 读取器兼容。 |
文本
| 密钥 | 默认 | 说明 |
|---|---|---|
compression |
none |
编写时要使用的压缩编解码器。 有效值:none、、bzip2、gziplz4、snappy、deflate。 zstd 适用于文本(DataFrameWriter)。 |
encoding |
UTF-8 |
输出文件的字符编码。 |
lineSep |
\n |
记录之间使用的行分隔符字符串。 适用于文本(DataFrameWriter)。 |
XML
| 密钥 | 默认 | 说明 |
|---|---|---|
arrayElementName |
item |
没有显式名称的数组元素的元素名称。 适用于 xml(DataFrameWriter)。 |
attributePrefix |
_ |
前面追加到与 XML 属性对应的字段名称的前缀。 适用于 xml(DataFrameWriter)。 |
compression |
none |
编写时要使用的压缩编解码器。 有效值:none、、bzip2、gziplz4、snappy、deflate。 zstd 适用于 xml(DataFrameWriter)。 |
dateFormat |
yyyy-MM-dd |
日期列值的格式字符串。 适用于 xml(DataFrameWriter)。 |
declaration |
version="1.0" encoding="UTF-8" standalone="yes" |
在每个输出文件的顶部写入的 XML 声明字符串。 设置为空字符串以禁止声明。 适用于 xml(DataFrameWriter)。 |
encoding |
UTF-8 |
输出文件的字符编码。 适用于 xml(DataFrameWriter)。 |
indent |
4 个空格 | 用于缩进输出中的子元素的字符串。 设置为空字符串以关闭缩进,并在单个行上写入每行。 |
locale |
en-US |
一个 java.util.Locale 标识符。 影响写入时的日期和时间时间戳值的格式设置。 |
nullValue |
null |
为 null 值编写的字符串。 如果设置为 nullnull 字段,则省略 null 字段的属性和子元素。 适用于 xml(DataFrameWriter)。 |
rootTag |
ROWS |
包装输出中所有行元素的根元素标记。 适用于 xml(DataFrameWriter)。 |
rowTag |
ROW |
表示输出中的行的元素标记。 适用于 xml(DataFrameWriter)。 |
singleVariantColumn |
None | 要写入 XML 文件的单个 Variant 列的名称。 适用于 xml(DataFrameWriter)。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
时间戳列值的格式字符串。 适用于 xml(DataFrameWriter)。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
设置时间戳的字符串格式,不带时区列值。 适用于 xml(DataFrameWriter)。 |
validateName |
true |
如果列名不是有效的 XML 元素标识符,是否引发异常。 适用于 xml(DataFrameWriter)。 |
valueTag |
_VALUE |
用于 XML 元素中也具有属性或子元素的字符数据的字段名称。 适用于 xml(DataFrameWriter)。 |
DataStreamWriter 选项
使用这些选项 DataStreamWriter.option() 来配置流式写入。
Example
以下示例设置流的检查点位置:
Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
Scala
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
Common
| 密钥 | 默认 | 说明 |
|---|---|---|
checkpointLocation |
无(必需) | 流式处理查询的检查点目录的路径。 容错和恰好一次的处理保证是必需的。 每个流式处理查询都必须使用唯一的检查点位置。 Databricks 建议将检查点存储在 Unity 目录卷或云存储路径中。 请参阅结构化流式处理检查点。 |
path |
None | 基于文件的流式处理接收器(如 Parquet)的输出路径。 仅适用于基于文件的格式。 |
控制台接收器
| 密钥 | 默认 | 说明 |
|---|---|---|
numRows |
20 |
写入控制台接收器时要显示的每个微批处理的行数。 |
truncate |
true |
是否在显示行时截断长字符串。 设置为 false 显示完整字符串值。 |
Delta Lake
使用 format("delta") 将流写入 Delta Lake 表时,以下选项适用。 仅覆盖选项,例如overwriteSchemareplaceWhere,不支持partitionOverwriteMode流式写入。
| 密钥 | 默认 | 说明 |
|---|---|---|
mergeSchema |
false |
流式处理数据帧包含新列时是否要改进 Delta Lake 表架构。 仅适用于追加输出模式。 适用于 更新表架构。 |
userMetadata |
None | 追加到写入操作的提交元数据中的用户定义的字符串。 在 . 的 DESCRIBE HISTORY输出中可见 适用于 使用自定义元数据扩充表。 |
文件存储端
将流写入基于文件的格式(Parquet、JSON、CSV、ORC、text)时,适用以下选项。 有关特定于格式的选项,请参阅 DataFrameWriter 选项。
| 密钥 | 默认 | 说明 |
|---|---|---|
retention |
None | 保留用于容错和压缩的接收器元数据文件的时长。 接受时间字符串,例如 7 days 或 24 hours。 如果未设置,元数据文件将无限期保留。 |
Kafka 接收器
有关将流写入 Kafka 的选项的完整列表,请参阅 “选项”。
| 密钥 | 默认 | 说明 |
|---|---|---|
kafka.bootstrap.servers |
None | 必填。 Kafka 中转站 host:port 地址的逗号分隔列表。 |
topic |
None | 所有行的目标 Kafka 主题。 如果 DataFrame 不包含列, topic 则为必需。 |
kafka.* |
None | 任何带有前缀的 kafka.Kafka 生成者配置。 例如,kafka.compression.type。 |
内存接收器
| 密钥 | 默认 | 说明 |
|---|---|---|
queryName |
无(必需) | 查询写入的内存中表的名称。 内存接收器是必需的。 也可以通过 .queryName(). 进行配置。 |
mode |
exactlyonce |
内存接收器的传递保证。
exactlyonce 使用具有完全一次语义的微批处理模式。
atleastonce 使用具有至少一次语义的连续模式。 有效值: exactlyonce、 atleastonce。 |
Spark 函数选项
某些 Spark SQL 内置函数接受控制 options 分析或序列化行为的映射。 将选项作为 Python dict 或 Scala Map[String, String] 传递。
Example
以下示例在删除格式不正确的记录时分析 JSON 列:
Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
Avro
Avro 函数接受与相应的数据帧选项相同的选项:
-
from_avro并使用schema_of_avroDataFrameReader Avro 选项。 -
to_avro使用 DataFrameWriter Avro 选项。
Example
以下示例解码启用了架构演变的 Avro 列:
Python
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
此外,架构注册表变体和from_avroto_avro接受以下选项:
| 密钥 | 默认 | 说明 |
|---|---|---|
schemaId |
None | 从 Confluent 架构注册表中解码与架构不兼容 jsonFormatSchema的 Avro 数据时要使用的架构 ID。
from_avro仅适用于。 |
confluent.schema.registry.* |
None | Confluent 架构注册表客户端配置属性。 使用此前缀传递任何 Confluent SR 客户端属性,例如 confluent.schema.registry.basic.auth.user.info 基本身份验证凭据。 架构注册表变体 from_avro 和 to_avro. |
CSV
CSV 函数接受与相应的数据帧选项相同的选项:
-
from_csv并使用schema_of_csvDataFrameReader CSV 选项。 -
to_csv使用 DataFrameWriter CSV 选项。
Example
以下示例使用自定义分隔符和 NULL 值读取 CSV:
Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
JSON
JSON 函数接受与相应的 DataFrame 选项相同的选项:
-
from_json并使用schema_of_jsonDataFrameReader JSON 选项。 -
to_json使用 DataFrameWriter JSON 选项。
Example
以下示例编写 JSON,其中 NULL 字段被忽略并启用了漂亮的格式设置:
Python
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf 不使用 to_protobuf 基于文件的 DataSource。 Protobuf 数据始终使用这些函数读取和写入为二进制列。 选项以区分 Map[String, String] 大小写的形式传递。
Example
以下示例使用 PERMISSIVE 模式解码 Protobuf 列:
Python
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Protobuf 函数使用以下选项:
| 密钥 | 默认 | 说明 |
|---|---|---|
mode |
FAILFAST |
如何处理损坏的记录。
FAILFAST 引发异常。
PERMISSIVE 将格式不正确的字段设置为 null。 有效值: FAILFAST、 PERMISSIVE。 适用于 from_protobuf. |
recursive.fields.max.depth |
-1 (已禁用) |
递归 Protobuf 字段的最大递归深度。 设置为 0 关闭递归字段支持。 有效值: 0 到 10. 适用于 from_protobuf. |
convert.any.fields.to.json |
false |
是否将 Protobuf Any 字段转换为 JSON 字符串而不是 STRUCT. 适用于 from_protobuf. |
emit.default.values |
false |
是发出包含零或默认值(proto3 语义)的字段。 当输出中省略具有默认值的字段时 false。 适用于 from_protobuf. |
enums.as.ints |
false |
是否将枚举字段呈现为整数值而不是字符串。 适用于 from_protobuf. |
upcast.unsigned.ints |
false |
是否向上转换uint32Long并uint64Decimal(20,0)防止整数溢出。 适用于 from_protobuf. |
unwrap.primitive.wrapper.types |
false |
是否将包装包装器类型(例如Int32Value,和StringValue)解包google.protobuf到相应的基元 Spark 类型。 适用于 from_protobuf. |
retain.empty.message.types |
false |
是否通过插入虚拟列来保留输出架构中的空 Protobuf 消息类型。 适用于 from_protobuf. |
schema.registry.subject |
None | 架构注册表使用者名称。 使用架构注册表变体 from_protobuf 和 to_protobuf. |
schema.registry.address |
None | 架构注册表地址(主机和端口)。 使用架构注册表变体 from_protobuf 和 to_protobuf. |
schema.registry.protobuf.name |
None | 指定架构注册表主体包含多个消息时要使用的 Protobuf 消息。 可选。 |
XML
XML 函数接受与相应的 DataFrame 选项相同的选项:
-
from_xml并使用schema_of_xmlDataFrameReader XML 选项。 -
to_xml使用 DataFrameWriter XML 选项。
Example
以下示例使用自定义根标记和行标记编写 XML:
Python
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))