读取和写入 XML 文件
重要
此功能目前以公共预览版提供。
本文介绍如何读取和写入 XML 文件。
可扩展标记语言 (XML) 是一种标记语言,用于以文本格式设置数据格式、存储和共享数据。 它定义一组规则,以序列化从文档到任意数据结构的数据。
本机 XML 文件格式支持启用对 XML 数据的引入、查询和分析,以便进行批处理或流式处理。 它可以自动推理并改进架构和数据类型,支持 SQL 表达式(例如 from_xml
)并生成 XML 文档。 它不需要使用外部 jar,可与自动加载程序、read_files
和 COPY INTO
无缝配合。 你可以选择性地根据 XML 架构定义 (XSD) 验证每个行级 XML 记录。
要求
Databricks Runtime 14.3 及更高版本
分析 XML 记录
XML 规范规定格式标准的结构。 但是,这一指定不会立即映射到表格格式。 必须指定 rowTag
选项以指示映射到 DataFrame
Row
的 XML 元素。 该 rowTag
元素将成为顶级 struct
元素。 rowTag
的子元素将成为顶级 struct
的字段。
可以为此记录指定架构,也可以让它自动推断。 由于分析程序仅检查 rowTag
元素,因此会筛选掉 DTD 和外部实体。
以下示例演示了使用不同 rowTag
选项对 XML 文件进行架构推理和分析:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
读取 rowTag
选项为“books”的 XML 文件:
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
输出:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
读取 rowTag
为“book”的 XML 文件:
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
输出:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
数据源选项
可以通过以下方式指定 XML 的数据源选项:
- 以下
.option/.options
方法:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- 以下内置函数:
- CREATE TABLE USING DATA_SOURCE 的
OPTIONS
子句
如需选项列表,请参阅自动加载程序选项。
XSD 支持
可以选择性地通过 XML 架构定义 (XSD) 验证每个行级 XML 记录。 XSD 文件在 rowValidationXSDPath
选项中指定。 XSD 不会以其他方式影响提供或推理的架构。 验证失败的记录被标记为“已损坏”,并根据选项部分中所述的损坏记录处理模式选项进行处理。
可以使用 XSDToSchema
从 XSD 文件中提取 Spark DataFrame 架构。 它仅支持简单类型、复杂类型和序列类型,仅支持基本 XSD 功能。
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
下表显示了 XSD 数据类型与 Spark 数据类型之间的转换:
XSD 数据类型 | Spark 数据类型 |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short ,unsignedByte |
ShortType |
integer ,negativeInteger ,nonNegativeInteger ,nonPositiveInteger ,positiveInteger ,unsignedShort |
IntegerType |
long ,unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
分析嵌套 XML
可以使用 schema_of_xml
和 from_xml
分析现有 DataFrame 字符串值列中的 XML 数据,并将架构和分析的结果作为新 struct
列返回。 作为自变量传递到 schema_of_xml
和 from_xml
的 XML 数据必须是单个格式标准的 XML 记录。
schema_of_xml
语法
schema_of_xml(xmlStr [, options] )
参数
xmlStr
:一个 STRING 表达式,用于指定单个格式标准的 XML 记录。options
:指定指令的可选MAP<STRING,STRING>
文本。
返回
一个包含结构定义的 STRING,该结构具有 n 个字符串字段,其中列名称派生自 XML 元素和属性名称。 这些字段值保存派生的格式化 SQL 类型。
from_xml
语法
from_xml(xmlStr, schema [, options])
参数
xmlStr
:一个 STRING 表达式,用于指定单个格式标准的 XML 记录。schema
:schema_of_xml
函数的 STRING 表达式或调用。options
:指定指令的可选MAP<STRING,STRING>
文本。
返回
一个结构,其字段名称和类型与架构定义匹配。 架构必须定义为逗号分隔的列名称和数据类型对(例如,与在 CREATE TABLE
中使用一样)。 数据源选项中显示的大多数选项都适用,但有以下例外情况:
rowTag
:由于只有一条 XML 记录,因此rowTag
选项不适用。mode
(默认值为PERMISSIVE
):允许采用在分析期间处理损坏记录的模式。PERMISSIVE
:遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord
配置的字段中,并将格式错误的字段设置为null
。 若要保留损坏的记录,可以在用户定义的架构中设置名为columnNameOfCorruptRecord
的字符串类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 推理架构时,它会在输出架构中隐式添加columnNameOfCorruptRecord
字段。FAILFAST
:遇到损坏的记录时引发异常。
结构转换
由于 DataFrame 和 XML 之间存在结构差异,因此对于从 XML 数据转换为 DataFrame
以及从 DataFrame
转换为 XML 数据来说,有一些转换规则。 请注意,可以使用选项 excludeAttribute
禁用处理属性。
从 XML 转换为 DataFrame
属性:属性将转换为具有标题前缀 attributePrefix
的字段。
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
生成以下架构:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
包含属性或子元素的元素中的字符数据:这些数据在 valueTag
字段中解析。 如果字符数据多次出现,则 valueTag
字段将转换为类型 array
。
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
生成以下架构:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
从 DataFrame 转换为 XML
元素作为数组中的数组:从具有 ArrayType
字段且其元素为 ArrayType
的 DataFrame
写入 XML 文件时,将为该元素提供额外的嵌套字段。 读写 XML 数据时不会发生这种情况,但写入从其他源读取的 DataFrame
时会。 因此,读写和写读 XML 文件具有同一结构,但写入从其他源读取的 DataFrame
可能具有另一结构。
DataFrame 具有以下架构:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
以及以下数据:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
生成以下 XML 文件:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
选项 arrayElementName
指定 DataFrame
中未命名数组的元素名称(默认值:item
)。
补救数据列
补救数据列可确保在 ETL 期间不会丢失或错过数据。 可以启用补救的数据列来捕获任何未分析的数据,因为记录中的一个或多个字段存在以下问题之一:
- 不存在于提供的架构中
- 与提供的架构的数据类型不匹配
- 与提供的架构中的字段名称大小写不匹配
补救数据列以 JSON 文档形式返回,其中包含已补救的列和记录的源文件路径。 若要从补救的数据列中删除源文件路径,可以设置以下 SQL 配置:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
你可以通过在读取数据时将选项 rescuedDataColumn
设置为某个列名来启用补救的数据列,例如,spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
的 _rescued_data
。
分析记录时,XML 分析程序支持三种模式:PERMISSIVE
、DROPMALFORMED
和 FAILFAST
。 与 rescuedDataColumn
一起使用时,数据类型不匹配不会导致在 DROPMALFORMED
模式下删除记录,或者在 FAILFAST
模式下引发错误。 只有损坏的记录(即不完整或格式错误的 XML)会被删除或引发错误。
自动加载程序中的架构参考和演化
有关本主题和适用选项的详细讨论,请参阅在自动加载程序中配置架构推理和演变。 可以将自动加载程序配置为自动检测已加载 XML 数据的架构,这样无需显式声明数据架构即可初始化表,并在引入新列时让表架构完成相应的演变。 这样就无需一直手动跟踪和应用架构更改。
默认情况下,自动加载程序架构推理会试图避免由于类型不匹配而出现的架构演变问题。 对于不对数据类型(JSON、CSV 和 XML)进行编码的格式,自动加载程序会将所有列推理为字符串,包括 XML 文件中的嵌套字段。 Apache Spark DataFrameReader
使用不同的行为进行架构推理,根据示例数据为 XML 源中的列选择数据类型。 若要使用自动加载程序实现此行为,请将选项 cloudFiles.inferColumnTypes
设置为 true
。
自动加载程序在处理数据时会检测是否添加了新列。 当自动加载程序检测到新列时,流会停止并出现 UnknownFieldException
。 在流引发此错误之前,自动加载程序会在最新的数据微批上执行架构推理,并通过将新列合并到架构末尾来使用最新架构更新架构位置。 现有列的数据类型将保持不变。 自动加载程序支持不同架构演变模式,可以在选项 cloudFiles.schemaEvolutionMode
中设置这些模式。
通过使用架构提示,可以强制实施你所知道的并期望出现在推理架构中的信息。 如果你知道某列采用特定的属性类型,或者想要选择更常规的数据类型(例如不使用整数,而改用双精度),可以为列数据类型提供任意数量的提示(格式为符合 SQL 架构规范语法的字符串)。 启用补救数据列时,以非架构所用的大小写形式命名的字段将加载到 _rescued_data
列。 你可以通过将选项 readerCaseSensitive
设置为 false
来更改此行为,在这种情况下自动加载程序将以不区分大小写的方式读取数据。
示例
本部分中的示例使用可在 Apache Spark GitHub 存储库中下载的 XML 文件。
读取和写入 XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
读取数据时,可以手动指定架构:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
XML 数据源可以推断数据类型:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
还可以在 DDL 中指定列名和类型。 在这种情况下,不会自动推断架构。
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
使用 COPY INTO 加载 XML
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
使用行验证读取 XML
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
分析嵌套 XML(from_xml 和 schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
使用 SQL API 的 from_xml 和 schema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
使用自动加载程序加载 XML
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)