Apache Avro 是一个数据序列化系统。 Avro 提供:
- 丰富的数据结构。
- 精简、快速的二进制数据格式。
- 一个容器文件,用于存储持久性数据。
- 远程过程调用 (RPC)。
- 与动态语言的简单集成。 不需要进行代码生成便可读取或写入数据文件,也不需要使用或实现 RPC 协议。 代码生成作为一种可选的优化,只值得为静态类型的语言实现。
Avro 数据源支持:
- 架构转换:Apache Spark SQL 与 Avro 记录之间的自动转换。
- 分区:无需任何额外配置即可轻松读取和写入分区的数据。
- 压缩:将 Avro 写入磁盘时要使用的压缩。 支持的类型包括
uncompressed、snappy和deflate。 还可以指定 deflate 级别。 - 记录名称:通过使用
recordName和recordNamespace传递参数的映射来记录名称和命名空间。
另请参阅读取和写入流 Avro 数据。
配置
你可以使用各种配置参数更改 Avro 数据源的行为。
若要在读取时忽略没有 .avro 扩展名的文件,可以在 Hadoop 配置中设置参数 avro.mapred.ignore.inputs.without.extension。 默认值为 false。
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
若要配置写入时的压缩,请设置以下 Spark 属性:
- 压缩编解码器:
spark.sql.avro.compression.codec。 支持的编解码器为snappy和deflate。 默认编解码器为snappy。 - 如果压缩编解码器为
deflate,则可通过spark.sql.avro.deflate.level设置压缩级别。 默认级别为-1。
可以在群集 Spark 配置中或在运行时使用 spark.conf.set() 设置这些属性。 例如:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
对于 Databricks Runtime 9.1 LTS 及更高版本,你可以通过在读取文件时提供 mergeSchema 选项来更改 Avro 中的默认架构推理行为。 将 mergeSchema 设置为 true 会从目标目录中的一组 Avro 文件推断架构,并将其合并,而不是从单个文件中推断出读取架构。
Avro -> Spark SQL 转换支持的类型
此库支持读取所有 Avro 类型。 它使用下述从 Avro 类型到 Spark SQL 类型的映射:
| Avro 类型 | Spark SQL 类型 |
|---|---|
| 布尔 | BooleanType |
| 整数 (int) | 整数类型 |
| 长整型 | LongType |
| FLOAT | 浮点类型 |
| 翻倍 | DoubleType |
| 字节 | 二进制类型 |
| 字符串 | 字符串类型 |
| 记录 (record) | 结构类型 |
| 枚举 | 字符串类型 |
| 数组 | 数组类型 (ArrayType) |
| 地图 | 地图类型 |
| 固定 | 二进制类型 |
| 联盟 | 请参阅联合类型。 |
联合类型
Avro 数据源支持读取 union 类型。 Avro 将以下三种类型视为 union 类型:
-
union(int, long)映射到LongType。 -
union(float, double)映射到DoubleType。 -
union(something, null),其中something是任何受支持的 Avro 类型。 这会映射到与something相同的 Spark SQL 类型,nullable设置为true。
所有其他 union 类型都是复杂类型。 它们将映射到 StructType,其中的字段名称是 member0、member1 等,与 union 的成员保持一致。 这与在 Avro 和 Parquet 之间进行转换时的行为一致。
逻辑类型
Avro 数据源支持读取以下 Avro 逻辑类型:
| Avro 逻辑类型 | Avro 类型 | Spark SQL 类型 |
|---|---|---|
| 日期 | 整数 (int) | 日期类型 |
| 时间戳-毫秒 | 长整型 | 时间戳类型 |
| 时间戳-微秒 | 长整型 | 时间戳类型 |
| Decimal | 固定 | 十进制类型 |
| Decimal | 字节 | 十进制类型 |
注意
Avro 数据源会忽略 Avro 文件中提供的文档、别名和其他属性。
Spark SQL -> Avro 转换支持的类型
此库支持将所有 Spark SQL 类型写入 Avro。 对于大多数类型,从 Spark 类型到 Avro 类型的映射直接了当(例如 IntegerType 转换为 int);下面列出了几种特殊情况:
| Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
|---|---|---|
| 字节类型 | 整数 (int) | |
| ShortType | 整数 (int) | |
| 二进制类型 | 字节 | |
| 十进制类型 | 固定 | Decimal |
| 时间戳类型 | 长整型 | 时间戳-微秒 |
| 日期类型 | 整数 (int) | 日期 |
你还可以通过选项 avroSchema 指定整个输出 Avro 架构,使 Spark SQL 类型可以转换为其他 Avro 类型。
以下转换默认情况下不会应用,需要用户指定的 Avro 架构:
| Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
|---|---|---|
| 字节类型 | 固定 | |
| 字符串类型 | 枚举 | |
| 十进制类型 | 字节 | Decimal |
| 时间戳类型 | 长整型 | 时间戳-毫秒 |
示例
这些示例使用 episodes.avro 文件。
Scala(编程语言)
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
此示例演示了一个自定义 Avro 架构:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
此示例演示了 Avro 压缩选项:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
此示例演示了分区的 Avro 记录:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
此示例演示了记录名称和命名空间:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python语言
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
若要采用 SQL 来查询 Avro 数据,请将数据文件注册为表或临时视图:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
笔记本示例:读取和写入 Avro 文件
以下笔记本演示了如何读取和写入 Avro 文件。