读取和写入流 Avro 数据Read and write streaming Avro data

Apache Avro 是流式处理领域中常用的数据序列化系统。Apache Avro is a commonly used data serialization system in the streaming world. 典型的解决方案是将数据以 Avro 格式放在 Apache Kafka 中,将元数据放在 Confluent 架构注册表中,然后使用同时连接到 Kafka 和架构注册表的流式处理框架运行查询。A typical solution is to put data in Avro format in Apache Kafka, metadata in Confluent Schema Registry, and then run queries with a streaming framework that connects to both Kafka and Schema Registry.

Azure Databricks 支持 from_avroto_avro 函数,允许使用 Kafka 中的 Avro 数据和架构注册表中的元数据来构建流式处理管道。Azure Databricks supports the from_avro and to_avro functions to build streaming pipelines with Avro data in Kafka and metadata in Schema Registry. 函数 to_avro 将列编码为 Avro 格式的二进制数据,而 from_avro 将 Avro 二进制数据解码为列。The function to_avro encodes a column as binary in Avro format and from_avro decodes Avro binary data into a column. 这两个函数都将一个列转换为另一个列,而输入/输出 SQL 数据类型可以是复杂类型或基元类型。Both functions transform one column to another column, and the input/output SQL data type can be a complex type or a primitive type.

备注

from_avroto_avro 函数:The from_avro and to_avro functions:

  • Python、Scala 和 Java 中可用。Are available in Python, Scala, and Java.
  • 可以在批中和流式处理查询中传递到 SQL 函数。Can be passed to SQL functions in both batch and streaming queries.

另请参阅 Avro 文件数据源Also see Avro file data source.

基本示例Basic example

from_jsonto_json 类似,可以将 from_avroto_avro 用于任何二进制列,但必须手动指定 Avro 架构。Similar to from_json and to_json, you can use from_avro and to_avro with any binary column, but you must specify the Avro schema manually.

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("article", "t")
  .save()

jsonFormatSchema 示例jsonFormatSchema example

还可以 JSON 字符串的形式指定架构。You can also specify a schema as a JSON string. 例如,如果 /tmp/user.avsc 为:For example, if /tmp/user.avsc is:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

可以创建一个 JSON 字符串:You can create a JSON string:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

然后在 from_avro 中使用该架构:Then use the schema in from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

使用架构注册表的示例Example with Schema Registry

如果群集具有架构注册表服务,则 from_avro 可以使用该服务,这样你就无需手动指定 Avro 架构。If your cluster has a Schema Registry service, from_avro can work with it so that you don’t need to specify the Avro schema manually.

备注

与架构注册表的集成仅适用于 Scala 和 Java。Integration with Schema Registry is available only in Scala and Java.

import org.apache.spark.sql.avro.functions._

// Read a Kafka topic "t", assuming the key and value are already
// registered in Schema Registry as subjects "t-key" and "t-value" of type
// string and int. The binary key and value columns are turned into string
// and int type with Avro and Schema Registry. The schema of the resulting DataFrame
// is: <key: string, value: int>.
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

对于 to_avro,默认输出 Avro 架构可能与架构注册表服务中目标使用者的架构不匹配,原因如下:For to_avro, the default output Avro schema might not match the schema of the target subject in the Schema Registry service for the following reasons:

  • 从 Spark SQL 类型到 Avro 架构的映射不是一对一。The mapping from Spark SQL type to Avro schema is not one-to-one. 请参阅 Spark SQL -> Avro 转换支持的类型See Supported types for Spark SQL -> Avro conversion.
  • 如果转换后的输出 Avro 模式是记录类型,则记录名称为 topLevelRecord,默认情况下没有命名空间。If the converted output Avro schema is of record type, the record name is topLevelRecord and there is no namespace by default.

如果 to_avro 的默认输出架构与目标使用者的架构匹配,则可执行以下代码:If the default output schema of to_avro matches the schema of the target subject, you can do the following:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("article", "t")
.save()

否则,必须在 to_avro 函数中提供目标使用者的架构:Otherwise, you must provide the schema of the target subject in the to_avro function:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("article", "t")
.save()