XML 文件XML file

本文介绍如何读取和写入 XML 文件作为 Apache Spark 数据源。This article describes how to read and write an XML file as an Apache Spark data source.

要求Requirements

  1. spark-xml 库创建为 Maven 库Create the spark-xml library as a Maven library. 针对 Maven 坐标,请指定:For the Maven coordinate, specify:

    • Databricks Runtime 7.x:com.databricks:spark-xml_2.12:<release>Databricks Runtime 7.x: com.databricks:spark-xml_2.12:<release>
    • Databricks Runtime 5.5 LTS 和 6.x:com.databricks:spark-xml_2.11:<release>Databricks Runtime 5.5 LTS and 6.x: com.databricks:spark-xml_2.11:<release>

    有关最新版 <release>,请参阅 spark-xml 版本See spark-xml Releases for the latest version of <release>.

  2. 在群集上安装库Install the library on a cluster.

示例Example

本部分中的示例使用 books XML 文件。The example in this section uses the books XML file.

  1. 检索 books XML 文件:Retrieve the books XML file:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. 将文件上传到 DBFSUpload the file to DBFS.

读取和写入 XML 数据Read and write XML data

SQLSQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

ScalaScala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

RR

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

选项Options

  • 读取Read
    • path:XML 文件的位置。path: Location of XML files. 接受标准 Hadoop 通配表达式。Accepts standard Hadoop globbing expressions.
    • rowTag:该行标记将视为行。rowTag: The row tag to treat as a row. 例如,在此 XML <books><book><book>...</books> 中,该值为 bookFor example, in this XML <books><book><book>...</books>, the value would be book. 默认值为 ROWDefault is ROW.
    • samplingRatio:推理架构的采样率 (0.0-1)。samplingRatio: Sampling ratio for inferring schema (0.0 ~ 1). 默认值为 1。Default is 1. 可用类型为 StructTypeArrayTypeStringTypeLongTypeDoubleTypeBooleanTypeTimestampTypeNullType,除非提供架构。Possible types are StructType, ArrayType, StringType, LongType, DoubleType, BooleanType, TimestampType and NullType, unless you provide a schema.
    • excludeAttribute:是否排除元素中的属性。excludeAttribute: Whether to exclude attributes in elements. 默认值为 false。Default is false.
    • nullValue:该值将视为 null 值。nullValue: The value to treat as a null value. 默认值为 ""Default is "".
    • mode:用于处理损坏记录的模式。mode: The mode for dealing with corrupt records. 默认值为 PERMISSIVEDefault is PERMISSIVE.
      • PERMISSIVE:PERMISSIVE:
        • 遇到损坏的记录时,会将所有字段设置为 null,并将格式错误的字符串放入 columnNameOfCorruptRecord 配置的新字段中。When it encounters a corrupted record, sets all fields to null and puts the malformed string into a new field configured by columnNameOfCorruptRecord.
        • 遇到数据类型错误的字段时,会将有问题的字段设置为 nullWhen it encounters a field of the wrong data type, sets the offending field to null.
      • DROPMALFORMED:忽略损坏的记录。DROPMALFORMED: ignores corrupted records.
      • FAILFAST:检测到损坏的记录时引发异常。FAILFAST: throws an exception when it detects corrupted records.
    • inferSchema:如果为 true,则尝试为生成的每个 DataFrame 列推理一个相应的类型,例如布尔、数字或日期类型。inferSchema: if true, attempts to infer an appropriate type for each resulting DataFrame column, like a boolean, numeric or date type. 如果为 false,则生成的所有列均为字符串类型。If false, all resulting columns are of string type. 默认值为 trueDefault is true.
    • columnNameOfCorruptRecord:存储格式错误的字符串的新字段的名称。columnNameOfCorruptRecord: The name of new field where malformed strings are stored. 默认值为 _corrupt_recordDefault is _corrupt_record.
    • attributePrefix:属性的前缀,用于区分属性和元素。attributePrefix: The prefix for attributes so that to differentiate attributes and elements. 这是字段名称的前缀。This is the prefix for field names. 默认值为 _Default is _.
    • valueTag:元素中没有子元素但有属性时用于值的标记。valueTag: The tag used for the value when there are attributes in an element that has no child elements. 默认值为 _VALUEDefault is _VALUE.
    • charset:默认为 UTF-8,但可设置为其他有效字符集名称。charset: Defaults to UTF-8 but can be set to other valid charset names.
    • ignoreSurroundingSpaces:是否应跳过值周围的空格。ignoreSurroundingSpaces: Whether or not whitespaces surrounding values should be skipped. 默认值为 false。Default is false.
    • rowValidationXSDPath:XSD 文件的路径,用于验证每行的 XML。rowValidationXSDPath: Path to an XSD file that is used to validate the XML for each row. 未能通过验证的行视为上述解析错误。Rows that fail to validate are treated like parse errors as above. XSD 不会以其他方式影响提供或推理的架构。The XSD does not otherwise affect the schema provided or inferred. 如果群集中的执行程序上未显示上述本地路径,则应使用 SparkContext.addFile 将 XSD 及其依赖的其他任何路径添加到 Spark 执行程序。If the same local path is not already also visible on the executors in the cluster, then the XSD and any others it depends on should be added to the Spark executors with SparkContext.addFile. 在这种情况下,若要使用本地 XSD /foo/bar.xsd,请调用 addFile("/foo/bar.xsd") 并将 "bar.xsd" 作为 rowValidationXSDPath 传递。In this case, to use local XSD /foo/bar.xsd, call addFile("/foo/bar.xsd") and pass "bar.xsd" as rowValidationXSDPath.
  • 写入Write
    • path:写入文件的位置。path: Location to write files.
    • rowTag:该行标记将视为行。rowTag: The row tag to treat as a row. 例如,在此 XML <books><book><book>...</books> 中,该值为 bookFor example, in this XML <books><book><book>...</books>, the value would be book. 默认值为 ROWDefault is ROW.
    • rootTag:该根标记将视为根。rootTag: The root tag to treat as the root. 例如,在此 XML <books><book><book>...</books> 中,该值为 booksFor example, in this XML <books><book><book>...</books>, the value would be books. 默认值为 ROWSDefault is ROWS.
    • nullValue:该值将写入 null 值。nullValue: The value to write null value. 默认值为字符串 "null"Default is the string "null". 当为 "null" 时,它不会为字段写入属性和元素。When "null", it does not write attributes and elements for fields.
    • attributePrefix:属性的前缀,用于区分属性和元素。attributePrefix: The prefix for attributes to differentiate attributes and elements. 这是字段名称的前缀。This is the prefix for field names. 默认值为 _Default is _.
    • valueTag:元素中没有子元素但有属性时用于值的标记。valueTag: The tag used for the value when there are attributes in an element that has no child elements. 默认值为 _VALUEDefault is _VALUE.
    • compression:保存到文件时使用的压缩编解码器。compression: Compression codec to use when saving to file. 应是实现 org.apache.hadoop.io.compress.CompressionCodec 的类的完全限定名称,或是不区分大小写的短名称之一(bzip2gziplz4snappy)。Should be the fully qualified name of a class implementing org.apache.hadoop.io.compress.CompressionCodec or one of case-insensitive short names (bzip2, gzip, lz4, and snappy). 默认为不压缩。Default is no compression.

支持使用短名称;可以使用 xml 代替 com.databricks.spark.xmlSupports the shortened name usage; You can use xml instead of com.databricks.spark.xml.

XSD 支持XSD support

可以使用 rowValidationXSDPath 针对 XSD 架构验证各行。You can validate individual rows against an XSD schema using rowValidationXSDPath.

可以使用实用工具 com.databricks.spark.xml.util.XSDToSchema 从某些 XSD 文件中提取 Spark DataFrame 架构。You use the utility com.databricks.spark.xml.util.XSDToSchema to extract a Spark DataFrame schema from some XSD files. 它仅支持简单类型、复杂类型和序列类型,仅支持基本 XSD 功能,且处于试验阶段。It supports only simple, complex and sequence types, only basic XSD functionality, and is experimental.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

分析嵌套 XMLParse nested XML

尽管 from_xml 方法主要用于将 XML 文件转换为 DataFrame,但也可以使用它在现有的 DataFrame 中解析字符串值列中的 XML,然后将其添加为新列,并将解析结果作为结构:Although primarily used to convert an XML file into a DataFrame, you can also use the from_xml method to parse XML in a string-valued column in an existing DataFrame and add it as a new column with parsed results as a struct with:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

备注

  • mode:mode:
    • 如果设置为默认值 PERMISSIVE,则解析模式默认为 DROPMALFORMEDIf set to PERMISSIVE, the default, the parse mode instead defaults to DROPMALFORMED. 如果在 from_xml 的架构中加入与 columnNameOfCorruptRecord 匹配的列,则 PERMISSIVE 模式会将格式错误的记录输出到生成的结构中的该列。If you include a column in the schema for from_xml that matches the columnNameOfCorruptRecord, then PERMISSIVE mode outputs malformed records to that column in the resulting struct.
    • 如果设置为 DROPMALFORMED,则未正确解析的 XML 值将为列生成 null 值。If set to DROPMALFORMED, XML values that do not parse correctly result in a null value for the column. 不删除任何行。No rows are dropped.
  • from_xml 将包含 XML 的字符串数组转换为已解析结构的数组。from_xml converts arrays of strings containing XML to arrays of parsed structs. 请改用 schema_of_xml_arrayUse schema_of_xml_array instead.
  • from_xml_string 是在 UDF 中使用的替代方法,可直接对字符串(而不是列)进行操作。from_xml_string is an alternative for use in UDFs that operates on a String directly instead of a column.

转换规则Conversion rules

由于 DataFrame 和 XML 之间存在结构差异,因此对于从 XML 数据转换为 DataFrame 以及从 DataFrame 转换为 XML 数据来说,有一些转换规则。Due to structural differences between DataFrames and XML, there are some conversion rules from XML data to DataFrame and from DataFrame to XML data. 可以使用选项 excludeAttribute 禁止处理某些属性。You can disable handling attributes with the option excludeAttribute.

将 XML 转换为 DataFrameConvert XML to DataFrame

  • 属性:属性会转换为具有 attributePrefix 选项中指定的前缀的字段。Attributes: Attributes are converted as fields with the prefix specified in the attributePrefix option. 如果 attributePrefix_,则以下文档If attributePrefix is _, the document

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    将生成以下架构:produces the schema:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • 如果元素有属性但没有子元素,则属性值将位于 valueTag 选项中指定的独立字段。If an element has attributes but no child elements, the attribute value is put in a separate field specified in the valueTag option. 如果 valueTag_VALUE,则以下文档If valueTag is _VALUE, the document

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    将生成以下架构:produces the schema:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

将 DataFrame 转换为 XMLConvert DataFrame to XML

从具有 ArrayType 字段且其元素为 ArrayType 的 DataFrame 写入 XML 文件时,将为该元素提供额外的嵌套字段。Writing a XML file from DataFrame having a field ArrayType with its element as ArrayType would have an additional nested field for the element. 读写 XML 数据时不会发生这种情况,但写入从其他源读取的 DataFrame 时会。This would not happen in reading and writing XML data but writing a DataFrame read from other sources. 因此,读写和写读 XML 文件具有同一结构,但写入从其他源读取的 DataFrame 可能具有另一结构。Therefore, roundtrip in reading and writing XML files has the same structure but writing a DataFrame read from other sources is possible to have a different structure.

如果 DataFrame 具有以下架构:A DataFrame with the schema:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

和以下数据:and data:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

则将生成以下 XML 文件:produces the XML file:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>