from_avro

将 Avro 格式的二进制列转换为其相应的催化剂值。 指定的架构必须与读取数据匹配,否则行为是未定义的:它可能会失败或返回任意结果。

如果未jsonFormatSchema提供但同时提供这subject两种格式schemaRegistryAddress,则函数会将架构注册表 Avro 格式的二进制列转换为其相应的催化剂值。

Syntax

from pyspark.sql.avro.functions import from_avro

from_avro(data, jsonFormatSchema=None, options=None, subject=None, schemaRegistryAddress=None)

参数

参数 类型 说明
data pyspark.sql.Column 或 str 包含 Avro 编码数据的二进制列。
jsonFormatSchema str,可选 JSON 字符串格式的 Avro 架构。
options dict,可选 用于控制如何分析和配置架构注册表客户端的 Avro 记录的选项。
subject str,可选 数据所属的架构注册表中的主题。
schemaRegistryAddress str,可选 架构注册表的地址(主机和端口)。

选项

选项 价值观 说明
mode FAILFASTPERMISSIVE 错误处理模式。 默认值:FAILFAST。 在 PERMISSIVE 模式下,损坏的记录设置为 NULL 而不是引发错误。
compression uncompressedsnappydeflatebzip2xzzstandard 用于编码 Avro 数据的压缩编解码器。
avroSchemaEvolutionMode nonerestart 架构演变模式。 默认值:none。 设置为 /> 时,查询将引发架构更改时。 重启作业以使用新架构。 请参阅 将架构演变模式与from_avro配合使用
recursiveFieldMaxDepth 范围: -115 单个递归路径的最大递归深度。 默认值: -1不限制递归深度。
当可从许多不同的架构路径访问共享类型时,架构扩展可能会导致驱动程序内存不足,因为此选项仅绑定一个路径的深度。 解决方法:

退货

pyspark.sql.Column:包含反序列化 Avro 数据作为相应催化剂值的新列。

示例

示例 1:使用 JSON 架构反序列化 Avro 二进制列

from pyspark.sql import Row
from pyspark.sql.avro.functions import from_avro, to_avro

data = [(1, Row(age=2, name='Alice'))]
df = spark.createDataFrame(data, ("key", "value"))
avro_df = df.select(to_avro(df.value).alias("avro"))
json_format_schema = '''{"type":"record","name":"topLevelRecord","fields":
    [{"name":"avro","type":[{"type":"record","name":"value",
    "namespace":"topLevelRecord","fields":[{"name":"age","type":["long","null"]},
    {"name":"name","type":["string","null"]}]},"null"]}]}'''
avro_df.select(from_avro(avro_df.avro, json_format_schema).alias("value")).show(truncate=False)
+------------------+
|value             |
+------------------+
|{{2, Alice}}      |
+------------------+