读取和写入 XML 文件

重要

此功能目前以公共预览版提供。

本文介绍如何读取和写入 XML 文件。

可扩展标记语言 (XML) 是一种标记语言,用于以文本格式设置数据格式、存储和共享数据。 它定义一组规则,以序列化从文档到任意数据结构的数据。

本机 XML 文件格式支持启用对 XML 数据的引入、查询和分析,以便进行批处理或流式处理。 它可以自动推理并改进架构和数据类型,支持 SQL 表达式(例如 from_xml)并生成 XML 文档。 它不需要使用外部 jar,可与自动加载程序、read_filesCOPY INTO 无缝配合。 你可以选择性地根据 XML 架构定义 (XSD) 验证每个行级 XML 记录。

要求

Databricks Runtime 14.3 及更高版本

分析 XML 记录

XML 规范规定格式标准的结构。 但是,这一指定不会立即映射到表格格式。 必须指定 rowTag 选项以指示映射到 DataFrame Row 的 XML 元素。 该 rowTag 元素将成为顶级 struct 元素。 rowTag 的子元素将成为顶级 struct 的字段。

可以为此记录指定架构,也可以让它自动推断。 由于分析程序仅检查 rowTag 元素,因此会筛选掉 DTD 和外部实体。

以下示例演示了使用不同 rowTag 选项对 XML 文件进行架构推理和分析:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

读取 rowTag 选项为“books”的 XML 文件:

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

输出:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

读取 rowTag 为“book”的 XML 文件:

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

输出:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

数据源选项

可以通过以下方式指定 XML 的数据源选项:

如需选项列表,请参阅自动加载程序选项

XSD 支持

可以选择性地通过 XML 架构定义 (XSD) 验证每个行级 XML 记录。 XSD 文件在 rowValidationXSDPath 选项中指定。 XSD 不会以其他方式影响提供或推理的架构。 验证失败的记录被标记为“已损坏”,并根据选项部分中所述的损坏记录处理模式选项进行处理。

可以使用 XSDToSchema 从 XSD 文件中提取 Spark DataFrame 架构。 它仅支持简单类型、复杂类型和序列类型,仅支持基本 XSD 功能。

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

下表显示了 XSD 数据类型与 Spark 数据类型之间的转换:

XSD 数据类型 Spark 数据类型
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

分析嵌套 XML

可以使用 schema_of_xmlfrom_xml 分析现有 DataFrame 字符串值列中的 XML 数据,并将架构和分析的结果作为新 struct 列返回。 作为自变量传递到 schema_of_xmlfrom_xml 的 XML 数据必须是单个格式标准的 XML 记录。

schema_of_xml

语法

schema_of_xml(xmlStr [, options] )

参数

  • xmlStr:一个 STRING 表达式,用于指定单个格式标准的 XML 记录。
  • options:指定指令的可选MAP<STRING,STRING>文本。

返回

一个包含结构定义的 STRING,该结构具有 n 个字符串字段,其中列名称派生自 XML 元素和属性名称。 这些字段值保存派生的格式化 SQL 类型。

from_xml

语法

from_xml(xmlStr, schema [, options])

参数

  • xmlStr:一个 STRING 表达式,用于指定单个格式标准的 XML 记录。
  • schemaschema_of_xml 函数的 STRING 表达式或调用。
  • options:指定指令的可选MAP<STRING,STRING>文本。

返回

一个结构,其字段名称和类型与架构定义匹配。 架构必须定义为逗号分隔的列名称和数据类型对(例如,与在 CREATE TABLE 中使用一样)。 数据源选项中显示的大多数选项都适用,但有以下例外情况:

  • rowTag:由于只有一条 XML 记录,因此 rowTag 选项不适用。
  • mode(默认值为 PERMISSIVE):允许采用在分析期间处理损坏记录的模式。
    • PERMISSIVE:遇到损坏的记录时,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式错误的字段设置为 null。 若要保留损坏的记录,可以在用户定义的架构中设置名为 columnNameOfCorruptRecord 的字符串类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 推理架构时,它会在输出架构中隐式添加 columnNameOfCorruptRecord 字段。
    • FAILFAST:遇到损坏的记录时引发异常。

结构转换

由于 DataFrame 和 XML 之间存在结构差异,因此对于从 XML 数据转换为 DataFrame 以及从 DataFrame 转换为 XML 数据来说,有一些转换规则。 请注意,可以使用选项 excludeAttribute 禁用处理属性。

从 XML 转换为 DataFrame

属性:属性将转换为具有标题前缀 attributePrefix 的字段。

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

生成以下架构:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

包含属性或子元素的元素中的字符数据:这些数据在 valueTag 字段中解析。 如果字符数据多次出现,则 valueTag 字段将转换为类型 array

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

生成以下架构:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

从 DataFrame 转换为 XML

元素作为数组中的数组:从具有 ArrayType 字段且其元素为 ArrayTypeDataFrame 写入 XML 文件时,将为该元素提供额外的嵌套字段。 读写 XML 数据时不会发生这种情况,但写入从其他源读取的 DataFrame 时会。 因此,读写和写读 XML 文件具有同一结构,但写入从其他源读取的 DataFrame 可能具有另一结构。

DataFrame 具有以下架构:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

以及以下数据:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

生成以下 XML 文件:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

选项 arrayElementName 指定 DataFrame 中未命名数组的元素名称(默认值:item)。

补救数据列

补救数据列可确保在 ETL 期间不会丢失或错过数据。 可以启用补救的数据列来捕获任何未分析的数据,因为记录中的一个或多个字段存在以下问题之一:

  • 不存在于提供的架构中
  • 与提供的架构的数据类型不匹配
  • 与提供的架构中的字段名称大小写不匹配

补救数据列以 JSON 文档形式返回,其中包含已补救的列和记录的源文件路径。 若要从补救的数据列中删除源文件路径,可以设置以下 SQL 配置:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

你可以通过在读取数据时将选项 rescuedDataColumn 设置为某个列名来启用补救的数据列,例如,spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)_rescued_data

分析记录时,XML 分析程序支持三种模式:PERMISSIVEDROPMALFORMEDFAILFAST。 与 rescuedDataColumn 一起使用时,数据类型不匹配不会导致在 DROPMALFORMED 模式下删除记录,或者在 FAILFAST 模式下引发错误。 只有损坏的记录(即不完整或格式错误的 XML)会被删除或引发错误。

自动加载程序中的架构参考和演化

有关本主题和适用选项的详细讨论,请参阅在自动加载程序中配置架构推理和演变。 可以将自动加载程序配置为自动检测已加载 XML 数据的架构,这样无需显式声明数据架构即可初始化表,并在引入新列时让表架构完成相应的演变。 这样就无需一直手动跟踪和应用架构更改。

默认情况下,自动加载程序架构推理会试图避免由于类型不匹配而出现的架构演变问题。 对于不对数据类型(JSON、CSV 和 XML)进行编码的格式,自动加载程序会将所有列推理为字符串,包括 XML 文件中的嵌套字段。 Apache Spark DataFrameReader 使用不同的行为进行架构推理,根据示例数据为 XML 源中的列选择数据类型。 若要使用自动加载程序实现此行为,请将选项 cloudFiles.inferColumnTypes 设置为 true

自动加载程序在处理数据时会检测是否添加了新列。 当自动加载程序检测到新列时,流会停止并出现 UnknownFieldException。 在流引发此错误之前,自动加载程序会在最新的数据微批上执行架构推理,并通过将新列合并到架构末尾来使用最新架构更新架构位置。 现有列的数据类型将保持不变。 自动加载程序支持不同架构演变模式,可以在选项 cloudFiles.schemaEvolutionMode 中设置这些模式。

通过使用架构提示,可以强制实施你所知道的并期望出现在推理架构中的信息。 如果你知道某列采用特定的属性类型,或者想要选择更常规的数据类型(例如不使用整数,而改用双精度),可以为列数据类型提供任意数量的提示(格式为符合 SQL 架构规范语法的字符串)。 启用补救数据列时,以非架构所用的大小写形式命名的字段将加载到 _rescued_data 列。 你可以通过将选项 readerCaseSensitive 设置为 false 来更改此行为,在这种情况下自动加载程序将以不区分大小写的方式读取数据。

示例

本部分中的示例使用可在 Apache Spark GitHub 存储库中下载的 XML 文件。

读取和写入 XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

读取数据时,可以手动指定架构:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL API

XML 数据源可以推断数据类型:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

还可以在 DDL 中指定列名和类型。 在这种情况下,不会自动推断架构。

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

使用 COPY INTO 加载 XML

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

使用行验证读取 XML

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

分析嵌套 XML(from_xml 和 schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

使用 SQL API 的 from_xml 和 schema_of_xml

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

使用自动加载程序加载 XML

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

其他资源

使用 spark-xml 库读取和写入 XML 数据