Avro file
Apache Avro is a data serialization system. Avro provides:
- Rich data structures.
- A compact, fast, binary data format.
- A container file, to store persistent data.
- Remote procedure call (RPC).
- Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.
The Avro data source supports:
- Schema conversion: Automatic conversion between Apache Spark SQL and Avro records.
- Partitioning: Easily reading and writing partitioned data without any extra configuration.
- Compression: Compression to use when writing Avro out to disk. The supported types are
uncompressed
,snappy
, anddeflate
. You can also specify the deflate level. - Record names: Record name and namespace by passing a map of parameters with
recordName
andrecordNamespace
.
Also see Read and write streaming Avro data.
Configuration
You can change the behavior of an Avro data source using various configuration parameters.
To ignore files without the .avro
extension when reading, you can set the parameter avro.mapred.ignore.inputs.without.extension
in the Hadoop configuration. The default is false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
To configure compression when writing, set the following Spark properties:
- Compression codec:
spark.sql.avro.compression.codec
. Supported codecs aresnappy
anddeflate
. The default codec issnappy
. - If the compression codec is
deflate
, you can set the compression level with:spark.sql.avro.deflate.level
. The default level is-1
.
You can set these properties in the cluster Spark configuration or at runtime using spark.conf.set()
. For example:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
For Databricks Runtime 9.1 LTS and above, you can change the default schema inference behavior in Avro by providing the mergeSchema
option when reading files. Setting mergeSchema
to true
will infer a schema from a set of Avro files in the target directory and merge them rather than infer the read schema from a single file.
Supported types for Avro -> Spark SQL conversion
This library supports reading all Avro types. It uses the following mapping from Avro types to Spark SQL types:
Avro type | Spark SQL type |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
bytes | BinaryType |
string | StringType |
record | StructType |
enum | StringType |
array | ArrayType |
map | MapType |
fixed | BinaryType |
union | See Union types. |
Union types
The Avro data source supports reading union
types. Avro considers the following three types to be union
types:
union(int, long)
maps toLongType
.union(float, double)
maps toDoubleType
.union(something, null)
, wheresomething
is any supported Avro type. This maps to the same Spark SQL type as that ofsomething
, withnullable
set totrue
.
All other union
types are complex types. They map to
StructType
where field names are member0
, member1
, and so on, in
accordance with members of the union
. This is consistent with the
behavior when converting between Avro and Parquet.
Logical types
The Avro data source supports reading the following Avro logical types:
Avro logical type | Avro type | Spark SQL type |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
Note
The Avro data source ignores docs, aliases, and other properties present in the Avro file.
Supported types for Spark SQL -> Avro conversion
This library supports writing of all Spark SQL types into Avro. For most
types, the mapping from Spark types to Avro types is straightforward
(for example IntegerType
gets converted to int
); the following is a list of the few special cases:
Spark SQL type | Avro type | Avro logical type |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DecimalType | fixed | decimal |
TimestampType | long | timestamp-micros |
DateType | int | date |
You can also specify the whole output Avro schema with the option avroSchema
, so that Spark SQL types can be converted into other Avro types.
The following conversions are not applied by default and require user specified Avro schema:
Spark SQL type | Avro type | Avro logical type |
---|---|---|
ByteType | fixed | |
StringType | enum | |
DecimalType | bytes | decimal |
TimestampType | long | timestamp-millis |
Examples
These examples use the episodes.avro file.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
This example demonstrates a custom Avro schema:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
This example demonstrates Avro compression options:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
This example demonstrates partitioned Avro records:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
This example demonstrates the record name and namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
To query Avro data in SQL, register the data file as a table or temporary view:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Notebook example: Read and write Avro files
The following notebook demonstrates how to read and write Avro files.