读取和写入协议缓冲区

Azure Databricks 原生支持 Apache Spark 结构和协议缓冲区 (protobuf) 之间的序列化和反序列化。 Protobuf 支持作为 Apache Spark 数据帧转换器实现,可以与结构化流式处理配合使用或用于批处理操作。

如何反序列化和序列化协议缓冲区

在 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 from_protobufto_protobuf 函数来序列化和反序列化数据。 Protobuf 序列化通常用于流式处理工作负载。

protobuf 函数的基本语法与读取和写入函数类似。 在使用之前,必须导入这些函数。

from_protobuf 将二进制列强制转换为结构,并将 to_protobuf 结构列强制转换为二进制。 必须提供使用 options 参数指定的架构注册表或由 descFilePath 参数标识的描述符文件。

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

以下示例演示如何使用 from_protobuf() 处理二进制 protobuf 记录,以及使用 to_protobuf() 将 Spark SQL 结构转换为二进制 protobuf。

将 protobuf 与 Confluent 架构注册表配合使用

Azure Databricks 支持使用 Confluent 架构注册表来定义 Protobuf。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

向外部 Confluent 架构注册表进行身份验证

若要向外部 Confluent 架构注册表进行身份验证,请更新架构注册表选项以包含身份验证凭据和 API 密钥。

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Unity 目录卷中使用信任存储和密钥存储文件

在 Databricks Runtime 14.3 LTS 及更高版本中,可以使用 Unity Catalog 卷中的信任存储和密钥存储文件向 Confluent 架构注册表进行身份验证。 根据以下示例更新架构注册表选项:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

将 Protobuf 与描述符文件配合使用

还可以引用可用于计算群集的 protobuf 描述符文件。 请确保你具有读取文件的适当权限,具体取决于其位置。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Protobuf 函数中支持的选项

Protobuf 函数中支持以下选项。

  • mode:确定反序列化 Protobuf 记录时的错误处理方式。 这些错误可能由各种类型的格式错误的记录引起,包括记录的实际架构与 from_protobuf() 中提供的预期架构不匹配。
      • FAILFAST(默认值):遇到格式错误的记录且任务失败时,会引发错误。
      • PERMISSIVE:为格式错误的记录返回 NULL。 请谨慎使用此选项,因为它可能导致丢弃许多记录。 当源中的一小部分记录不正确时,这非常有用。
  • recursive.fields.max.depth:添加对递归字段的支持。 Spark SQL 架构不支持递归字段。 未指定此选项时,不允许递归字段。 为了在 Protobuf 中支持递归字段,需要将该字段扩展到指定的深度。
      • -1 (默认值):不允许递归字段。

      • 0:丢弃递归字段。

      • 1:允许单个级别的递归。

      • [2-10]:指定多级递归的阈值,最多为 10 级。

        将值设置为大于 0 可以通过将嵌套字段扩展到配置的深度来允许递归字段。 不允许设置大于 10 的值,以避免无意中创建非常大的架构。 如果 Protobuf 消息的深度超出了配置的限制,则返回的 Spark 结构将在超出递归限制后被截断。

    • 示例:假设有一个具有以下递归字段的 Protobuf:

      message Person { string name = 1; Person friend = 2; }
      

      下面列出了此设置具有不同值的结束架构:

      • 选项设置为 1:STRUCT<name: STRING>
      • 选项设置为 2:STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • 选项设置为 3:STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json:此选项可用于将 Protobuf Any 字段转换为 JSON。 应谨慎启用此功能。 JSON 转换和处理效率低下。 此外,JSON 字符串字段会丢失 Protobuf 架构安全性,使下游处理容易出错。
      • False(默认值):在运行时,此类通配符字段可以以二进制数据的形式包含任意 Protobuf 消息。 默认情况下,此类字段的处理方式类似于普通 Protobuf 消息。 它有两个架构为 (STRUCT<type_url: STRING, value: BINARY>) 的字段。 默认情况下,不会以任何方式解释二进制 value 字段。 但在实际操作中,二进制数据在一些应用程序中可能不方便使用。
      • True:通过将此值设置为 True,可在运行时将 Any 字段转换为 JSON 字符串。 使用此选项,将分析二进制文件,并将 Protobuf 消息反序列化为 JSON 字符串。
    • 示例:假设有两个定义如下的 Protobuf 类型:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      启用此选项后,from_protobuf("col", messageName ="ProtoWithAny") 的架构为:STRUCT<event_name: STRING, details: STRING>

      在运行时,如果 details 字段包含 Person Protobuf 消息,则返回的值如下所示:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')

    • 要求:

      • Any 字段中使用的所有可能的 Protobuf 类型的定义应在传递给 from_protobuf() 的 Protobuf 描述符文件中提供。
      • 如果找不到 Any Protobuf,则会导致该记录出错。
      • Schema-registry 目前不支持此功能。
  • emit.default.values:在将 Protobuf 反序列化为 Spark 结构时,启用零值呈现字段。 应谨慎使用此选项。 通常不建议依赖语义中的这种细微差异。
      • False(默认值):当序列化 Protobuf 中的字段为空时,Spark 结构中生成的字段默认为 null。 不启用此选项并将 null 视为默认值会更简单。
      • True:启用此选项后,会用相应的默认值填充这些字段。
    • 示例:假设以下 Protobuf 的 Protobuf 构造类似于 Person(age=0, middle_name="")

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • 如果此选项设置为 False,则调用 from_protobuf() 后的 Spark 结构将全部为 null:{"name": null, "age": null, "middle_name": "", "salary": null}。 即使设置了两个字段(agemiddle_name)的值,Protobuf 也不以有线格式包含它们,因为它们是默认值。
      • 如果此选项设置为 True,则调用 from_protobuf() 后的 Spark 结构将为:{"name": "", "age": 0, "middle_name": "", "salary": null}salary 字段保留为 null,因为它被显式声明为 optional,并且未在输入记录中设置。
  • enums.as.ints:启用后,Protobuf 中的枚举字段在 Spark 中呈现为整数字段。
      • False(默认值)
      • True:启用后,Protobuf 中的枚举字段在 Spark 中呈现为整数字段。
    • 示例:考虑以下 Protobuf

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      给定一条类似 Person(job = ENGINEER) 的 Protobuf 消息:

      • 禁用此选项后,相应的 Spark 结构将为 {"job": "ENGINEER"}
      • 启用此选项后,相应的 Spark 结构将为 {"job": 1}

      请注意,这些字段的架构在每种情况下都是不同的(整数而不是默认字符串)。 此类更改可能会影响下游表的架构。

架构注册表选项

将架构注册表与 Protobuf 函数一起使用时,以下架构注册表选项是相关的。

  • schema.registry.subject
    • 必须
    • 指定架构注册表中架构的主题,例如“client-event”
  • schema.registry.address
    • 必须
    • 架构注册表的 URL,例如 https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • 可选
    • 默认:<NONE>
    • 主题的 schema-registry 条目可以包含多个 Protobuf 定义,就像单个 proto 文件一样。 如果未指定此选项,则第一个 Protobuf 用于架构。 当 Protobuf 消息不是条目中的第一个消息时,请指定该消息的名称。 例如,假设有两个 Protobuf 定义的条目:顺序为“Person”和“Location”。 如果该流对应于“Location”而不是“Person”,请将此选项设置为“Location”(或其全名,包括包“com.example.protos.Location”)。
  • schema.registry.schema.evolution.mode
    • 默认值:“重启”。
    • 支持的模式:
      • "restart"
      • "none"
    • 此选项为 from_protobuf() 设置架构演变模式。 在查询开始时,Spark 会记录给定主题的最新 schema-id。 这可确定 from_protobuf() 的架构。 查询启动后,可能会将新架构发布到架构注册表。 当在传入记录中发现较新的 schema-id 时,它表明架构发生了更改。 此选项确定如何处理此类架构更改:
      • restart(默认值):在发现较新的 schema-id 时触发 UnknownFieldException。 这会终止查询。 Databricks 建议将作业配置为在查询失败时重启,以获取架构更改。
      • none:忽略 schema-id 更改。 具有较新 schema-id 的记录使用在查询开始时观察到的同一架构进行分析。 较新的 Protobuf 定义应向后兼容,并忽略新字段。
  • confluent.schema.registry.<schema-registy-client-option>
    • 可选
    • Schema-registry 使用 Confluent 架构注册表客户端连接到 Confluent schema-registry。 客户端支持的任何配置选项都可以使用前缀“confluent.schema.registry”指定。 例如,以下两个设置提供“USER_INFO”身份验证凭据:
      • "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO'
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"