读取和写入流 Avro 数据

Apache Avro 是流式处理领域中常用的数据序列化系统。 典型的解决方案是将数据以 Avro 格式放在 Apache Kafka 中,将元数据放在 Confluent 架构注册表中,然后使用同时连接到 Kafka 和架构注册表的流式处理框架运行查询。

Azure Databricks 支持 from_avroto_avro 函数,允许使用 Kafka 中的 Avro 数据和架构注册表中的元数据来构建流式处理管道。 函数 to_avro 将列编码为 Avro 格式的二进制数据,而 from_avro 将 Avro 二进制数据解码为列。 这两个函数都将一个列转换为另一个列,而输入/输出 SQL 数据类型可以是复杂类型或基元类型。

注意

from_avroto_avro 函数:

  • Python、Scala 和 Java 中可用。
  • 可以在批中和流式处理查询中传递到 SQL 函数。

另请参阅 Avro 文件数据源

手动指定的架构示例

from_jsonto_json 类似,可以将 from_avroto_avro 用于任何二进制列。 可以手动指定 Avro 架构,如以下示例所示:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema 示例

还可以 JSON 字符串的形式指定架构。 例如,如果 /tmp/user.avsc 为:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

可以创建一个 JSON 字符串:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

然后在 from_avro 中使用该架构:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

使用架构注册表的示例

如果群集具有架构注册表服务,则 from_avro 可以使用该服务,这样你就无需手动指定 Avro 架构。

以下示例演示如何读取 Kafka 主题“t”,假设键和值已在架构注册表中注册为类型为 STRINGINT 的主题“t-key”和“t-value”:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

对于 to_avro,默认输出 Avro 架构可能与架构注册表服务中目标使用者的架构不匹配,原因如下:

  • 从 Spark SQL 类型到 Avro 架构的映射不是一对一。 请参阅 Spark SQL -> Avro 转换支持的类型
  • 如果转换后的输出 Avro 模式是记录类型,则记录名称为 topLevelRecord,默认情况下没有命名空间。

如果 to_avro 的默认输出架构与目标使用者的架构匹配,则可执行以下代码:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

否则,必须在 to_avro 函数中提供目标使用者的架构:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

向外部 Confluent 架构注册表进行身份验证

在 Databricks Runtime 12.2 LTS 及更高版本中,可以向外部 Confluent 架构注册表进行身份验证。 以下示例演示如何将注册表选项配置为包含身份验证凭据和 API 密钥。

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Unity 目录卷中使用信任存储和密钥存储文件

在 Databricks Runtime 14.3 LTS 及更高版本中,可以使用 Unity Catalog 卷中的信任存储和密钥存储文件向 Confluent 架构注册表进行身份验证。 使用以下语法更新上述示例中的配置:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

将架构演变模式与 from_avro 配合使用

在 Databricks Runtime 14.2 及更高版本中,可以将架构演变模式用于 from_avro。 启用架构演变模式会导致作业在检测架构演变后引发 UnknownFieldException。 Databricks 建议配置具有架构演变模式的作业,以便在任务失败时自动重启。 将结构化流式处理作业配置为在失败时重启流式处理查询

如果希望源数据的架构随着时间的推移而演变并引入数据源中的所有字段,则架构演变非常有用。 如果查询已显式指定要在数据源中查询的字段,则无论架构演变如何,都会忽略添加的字段。

使用 avroSchemaEvolutionMode 选项启用架构演变。 下表描述了架构演变模式的选项:

选项 行为
none 默认。 忽略架构演变,作业继续。
restart 检测架构演变时引发 UnknownFieldException。 需要重启作业。

注意

可以在流式处理作业之间更改此配置,并重复使用相同的检查点。 禁用架构演变可能会导致删除列。

配置分析模式

可以配置分析模式,以确定在禁用架构演变模式时是要失败还是发出 null 记录,架构以非向后兼容的方式发展。 使用默认设置时,from_avro 在观察到不兼容的架构更改时失败。

使用 mode 选项指定分析模式。 下表对这些选项进行说明:

选项 行为
FAILFAST 默认。 分析错误会引发具有 MALFORMED_AVRO_MESSAGEerrorClassSparkException
PERMISSIVE 将忽略分析错误并发出 null 记录。

注意

启用架构演变后,FAILFAST 仅在记录损坏时引发异常。

使用架构演变和设置分析模式的示例

以下示例演示如何使用 Confluent 架构注册表启用架构演变并指定 FAILFAST 分析模式:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)