Avro 文件
Apache Avro 是一个数据序列化系统。 Avro 提供:
- 丰富的数据结构。
- 精简、快速的二进制数据格式。
- 一个容器文件,用于存储持久性数据。
- 远程过程调用 (RPC)。
- 与动态语言的简单集成。 不需要进行代码生成便可读取或写入数据文件,也不需要使用或实现 RPC 协议。 代码生成作为一种可选的优化,只值得为静态类型的语言实现。
Avro 数据源支持:
- 架构转换:Apache Spark SQL 与 Avro 记录之间的自动转换。
- 分区:无需任何额外配置即可轻松读取和写入分区的数据。
- 压缩:将 Avro 写入磁盘时要使用的压缩。 支持的类型包括
uncompressed
、snappy
和deflate
。 还可以指定 deflate 级别。 - 记录名称:通过使用
recordName
和recordNamespace
传递参数的映射来记录名称和命名空间。
另请参阅读取和写入流 Avro 数据。
配置
你可以使用各种配置参数更改 Avro 数据源的行为。
若要在读取时忽略没有 .avro
扩展名的文件,可以在 Hadoop 配置中设置参数 avro.mapred.ignore.inputs.without.extension
。 默认值为 false
。
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
若要配置写入时的压缩,请设置以下 Spark 属性:
- 压缩编解码器:
spark.sql.avro.compression.codec
。 支持的编解码器为snappy
和deflate
。 默认编解码器为snappy
。 - 如果压缩编解码器为
deflate
,则可通过spark.sql.avro.deflate.level
设置压缩级别。 默认级别为-1
。
可以在群集 Spark 配置中或在运行时使用 spark.conf.set()
设置这些属性。 例如:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
对于 Databricks Runtime 9.1 LTS 及更高版本,你可以通过在读取文件时提供 mergeSchema
选项来更改 Avro 中的默认架构推理行为。 将 mergeSchema
设置为 true
会从目标目录中的一组 Avro 文件推断架构,并将其合并,而不是从单个文件中推断出读取架构。
Avro -> Spark SQL 转换支持的类型
此库支持读取所有 Avro 类型。 它使用下述从 Avro 类型到 Spark SQL 类型的映射:
Avro 类型 | Spark SQL 类型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
FLOAT | FloatType |
double | DoubleType |
字节 | BinaryType |
字符串 | StringType |
记录 (record) | StructType |
enum | StringType |
array | ArrayType |
map | MapType |
fixed | BinaryType |
union | 请参阅联合类型。 |
联合类型
Avro 数据源支持读取 union
类型。 Avro 将以下三种类型视为 union
类型:
union(int, long)
映射到LongType
。union(float, double)
映射到DoubleType
。union(something, null)
,其中something
是任何受支持的 Avro 类型。 这会映射到与something
相同的 Spark SQL 类型,nullable
设置为true
。
所有其他 union
类型都是复杂类型。 它们将映射到 StructType
,其中的字段名称是 member0
、member1
等,与 union
的成员保持一致。 这与在 Avro 和 Parquet 之间进行转换时的行为一致。
逻辑类型
Avro 数据源支持读取以下 Avro 逻辑类型:
Avro 逻辑类型 | Avro 类型 | Spark SQL 类型 |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
Decimal | fixed | DecimalType |
Decimal | 字节 | DecimalType |
注意
Avro 数据源会忽略 Avro 文件中提供的文档、别名和其他属性。
Spark SQL -> Avro 转换支持的类型
此库支持将所有 Spark SQL 类型写入 Avro。 对于大多数类型,从 Spark 类型到 Avro 类型的映射直接了当(例如 IntegerType
转换为 int
);下面列出了几种特殊情况:
Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | 字节 | |
DecimalType | fixed | Decimal |
TimestampType | long | timestamp-micros |
DateType | int | date |
你还可以通过选项 avroSchema
指定整个输出 Avro 架构,使 Spark SQL 类型可以转换为其他 Avro 类型。
以下转换默认情况下不会应用,需要用户指定的 Avro 架构:
Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
ByteType | fixed | |
StringType | enum | |
DecimalType | 字节 | Decimal |
TimestampType | long | timestamp-millis |
示例
这些示例使用 episodes.avro 文件。
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
此示例演示了一个自定义 Avro 架构:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
此示例演示了 Avro 压缩选项:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
此示例演示了分区的 Avro 记录:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
此示例演示了记录名称和命名空间:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
若要采用 SQL 来查询 Avro 数据,请将数据文件注册为表或临时视图:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
笔记本示例:读取和写入 Avro 文件
以下笔记本演示了如何读取和写入 Avro 文件。