Avro 文件Avro file
Apache Avro 是一个数据序列化系统。Apache Avro is a data serialization system. Avro 提供:Avro provides:
- 丰富的数据结构。Rich data structures.
- 精简、快速的二进制数据格式。A compact, fast, binary data format.
- 一个容器文件,用于存储持久性数据。A container file, to store persistent data.
- 远程过程调用 (RPC)。Remote procedure call (RPC).
- 与动态语言的简单集成。Simple integration with dynamic languages. 不需要进行代码生成便可读取或写入数据文件,也不需要使用或实现 RPC 协议。Code generation is not required to read or write data files nor to use or implement RPC protocols. 代码生成作为一种可选的优化,只值得为静态类型的语言实现。Code generation as an optional optimization, only worth implementing for statically typed languages.
Avro 数据源支持:The Avro data source supports:
- 架构转换:Apache Spark SQL 与 Avro 记录之间的自动转换。Schema conversion: Automatic conversion between Apache Spark SQL and Avro records.
- 分区:无需任何额外配置即可轻松读取和写入分区的数据。Partitioning: Easily reading and writing partitioned data without any extra configuration.
- 压缩:将 Avro 写入磁盘时要使用的压缩。Compression: Compression to use when writing Avro out to disk. 支持的类型包括
uncompressed
、snappy
和deflate
。The supported types areuncompressed
,snappy
, anddeflate
. 还可以指定 deflate 级别。You can also specify the deflate level. - 记录名称:通过使用
recordName
和recordNamespace
传递参数的映射来记录名称和命名空间。Record names: Record name and namespace by passing a map of parameters withrecordName
andrecordNamespace
.
另请参阅读取和写入流 Avro 数据。Also see Read and write streaming Avro data.
配置Configuration
你可以使用各种配置参数更改 Avro 数据源的行为。You can change the behavior of an Avro data source using various configuration parameters.
若要在读取时忽略没有 .avro
扩展名的文件,可以在 Hadoop 配置中设置参数 avro.mapred.ignore.inputs.without.extension
。To ignore files without the .avro
extension when reading, you can set the parameter avro.mapred.ignore.inputs.without.extension
in the Hadoop configuration. 默认值为 false
。The default is false
.
spark.
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
若要配置写入时的压缩,请设置以下 Spark 属性:To configure compression when writing, set the following Spark properties:
- 压缩编解码器:
spark.sql.avro.compression.codec
。Compression codec:spark.sql.avro.compression.codec
. 支持的编解码器为snappy
和deflate
。Supported codecs aresnappy
anddeflate
. 默认编解码器为snappy
。The default codec issnappy
. - 如果压缩编解码器为
deflate
,则可通过spark.sql.avro.deflate.level
设置压缩级别。If the compression codec isdeflate
, you can set the compression level with:spark.sql.avro.deflate.level
. 默认级别为-1
。The default level is-1
.
可以在群集 Spark 配置中或在运行时使用 spark.conf.set()
设置这些属性。You can set these properties in the cluster Spark configuration or at runtime using spark.conf.set()
. 例如: 。For example:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Avro -> Spark SQL 转换支持的类型Supported types for Avro -> Spark SQL conversion
此库支持读取所有 Avro 类型。This library supports reading all Avro types. 它使用下述从 Avro 类型到 Spark SQL 类型的映射:It uses the following mapping from Avro types to Spark SQL types:
Avro 类型Avro type | Spark SQL 类型Spark SQL type |
---|---|
booleanboolean | BooleanTypeBooleanType |
intint | IntegerTypeIntegerType |
longlong | LongTypeLongType |
FLOATfloat | FloatTypeFloatType |
doubledouble | DoubleTypeDoubleType |
字节bytes | BinaryTypeBinaryType |
字符串string | StringTypeStringType |
记录 (record)record | StructTypeStructType |
enumenum | StringTypeStringType |
arrayarray | ArrayTypeArrayType |
mapmap | MapTypeMapType |
fixedfixed | BinaryTypeBinaryType |
unionunion | 请参阅联合类型。See Union types. |
联合类型Union types
Avro 数据源支持读取 union
类型。The Avro data source supports reading union
types. Avro 将以下三种类型视为 union
类型:Avro considers the following three types to be union
types:
union(int, long)
映射到LongType
。union(int, long)
maps toLongType
.union(float, double)
映射到DoubleType
。union(float, double)
maps toDoubleType
.union(something, null)
,其中something
是任何受支持的 Avro 类型。union(something, null)
, wheresomething
is any supported Avro type. 这会映射到与something
相同的 Spark SQL 类型,nullable
设置为true
。This maps to the same Spark SQL type as that ofsomething
, withnullable
set totrue
.
所有其他 union
类型都是复杂类型。All other union
types are complex types. 它们将映射到 StructType
,其中的字段名称是 member0
、member1
等,与 union
的成员保持一致。They map to StructType
where field names are member0
, member1
, and so on, in accordance with members of the union
. 这与在 Avro 和 Parquet 之间进行转换时的行为一致。This is consistent with the behavior when converting between Avro and Parquet.
逻辑类型Logical types
Avro 数据源支持读取以下 Avro 逻辑类型:The Avro data source supports reading the following Avro logical types:
Avro 逻辑类型Avro logical type | Avro 类型Avro type | Spark SQL 类型Spark SQL type |
---|---|---|
datedate | intint | DateTypeDateType |
timestamp-millistimestamp-millis | longlong | TimestampTypeTimestampType |
timestamp-microstimestamp-micros | longlong | TimestampTypeTimestampType |
Decimaldecimal | fixedfixed | DecimalTypeDecimalType |
Decimaldecimal | 字节bytes | DecimalTypeDecimalType |
备注
Avro 数据源会忽略 Avro 文件中提供的文档、别名和其他属性。The Avro data source ignores docs, aliases, and other properties present in the Avro file.
Spark SQL -> Avro 转换支持的类型Supported types for Spark SQL -> Avro conversion
此库支持将所有 Spark SQL 类型写入 Avro。This library supports writing of all Spark SQL types into Avro. 对于大多数类型,从 Spark 类型到 Avro 类型的映射直接了当(例如 IntegerType
转换为 int
);下面列出了几种特殊情况:For most types, the mapping from Spark types to Avro types is straightforward (for example IntegerType
gets converted to int
); the following is a list of the few special cases:
Spark SQL 类型Spark SQL type | Avro 类型Avro type | Avro 逻辑类型Avro logical type |
---|---|---|
ByteTypeByteType | intint | |
ShortTypeShortType | intint | |
BinaryTypeBinaryType | 字节bytes | |
DecimalTypeDecimalType | fixedfixed | Decimaldecimal |
TimestampTypeTimestampType | longlong | timestamp-microstimestamp-micros |
DateTypeDateType | intint | datedate |
你还可以通过选项 avroSchema
指定整个输出 Avro 架构,使 Spark SQL 类型可以转换为其他 Avro 类型。You can also specify the whole output Avro schema with the option avroSchema
, so that Spark SQL types can be converted into other Avro types.
以下转换默认情况下不会应用,需要用户指定的 Avro 架构:The following conversions are not applied by default and require user specified Avro schema:
Spark SQL 类型Spark SQL type | Avro 类型Avro type | Avro 逻辑类型Avro logical type |
---|---|---|
ByteTypeByteType | fixedfixed | |
StringTypeStringType | enumenum | |
DecimalTypeDecimalType | 字节bytes | Decimaldecimal |
TimestampTypeTimestampType | longlong | timestamp-millistimestamp-millis |
示例Examples
这些示例使用 episodes.avro 文件。These examples use the episodes.avro file.
ScalaScala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
此示例演示了一个自定义 Avro 架构:This example demonstrates a custom Avro schema:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
此示例演示了 Avro 压缩选项:This example demonstrates Avro compression options:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
此示例演示了分区的 Avro 记录:This example demonstrates partitioned Avro records:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
此示例演示了记录名称和命名空间:This example demonstrates the record name and namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
PythonPython
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQLSQL
若要采用 SQL 来查询 Avro 数据,请将数据文件注册为表或临时视图:To query Avro data in SQL, register the data file as a table or temporary view:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
笔记本Notebook
以下笔记本演示了如何读取和写入 Avro 文件。The following notebook demonstrates how to read and write Avro files.