将 Avro 格式的二进制列转换为其相应的催化剂值。 指定的架构必须与读取数据匹配,否则行为是未定义的:它可能会失败或返回任意结果。
如果未jsonFormatSchema提供但同时提供这subject两种格式schemaRegistryAddress,则函数会将架构注册表 Avro 格式的二进制列转换为其相应的催化剂值。
Syntax
from pyspark.sql.avro.functions import from_avro
from_avro(data, jsonFormatSchema=None, options=None, subject=None, schemaRegistryAddress=None)
参数
| 参数 | 类型 | 说明 |
|---|---|---|
data |
pyspark.sql.Column 或 str |
包含 Avro 编码数据的二进制列。 |
jsonFormatSchema |
str,可选 | JSON 字符串格式的 Avro 架构。 |
options |
dict,可选 | 用于控制如何分析和配置架构注册表客户端的 Avro 记录的选项。 |
subject |
str,可选 | 数据所属的架构注册表中的主题。 |
schemaRegistryAddress |
str,可选 | 架构注册表的地址(主机和端口)。 |
选项
| 选项 | 价值观 | 说明 |
|---|---|---|
mode |
FAILFAST、PERMISSIVE |
错误处理模式。 默认值:FAILFAST。 在 PERMISSIVE 模式下,损坏的记录设置为 NULL 而不是引发错误。 |
compression |
uncompressed、snappy、deflate、bzip2、xz、zstandard |
用于编码 Avro 数据的压缩编解码器。 |
avroSchemaEvolutionMode |
none、restart |
架构演变模式。 默认值:none。 设置为 |
recursiveFieldMaxDepth |
范围: -1 到 15 |
单个递归路径的最大递归深度。 默认值: -1不限制递归深度。当可从许多不同的架构路径访问共享类型时,架构扩展可能会导致驱动程序内存不足,因为此选项仅绑定一个路径的深度。 解决方法:
|
退货
pyspark.sql.Column:包含反序列化 Avro 数据作为相应催化剂值的新列。
示例
示例 1:使用 JSON 架构反序列化 Avro 二进制列
from pyspark.sql import Row
from pyspark.sql.avro.functions import from_avro, to_avro
data = [(1, Row(age=2, name='Alice'))]
df = spark.createDataFrame(data, ("key", "value"))
avro_df = df.select(to_avro(df.value).alias("avro"))
json_format_schema = '''{"type":"record","name":"topLevelRecord","fields":
[{"name":"avro","type":[{"type":"record","name":"value",
"namespace":"topLevelRecord","fields":[{"name":"age","type":["long","null"]},
{"name":"name","type":["string","null"]}]},"null"]}]}'''
avro_df.select(from_avro(avro_df.avro, json_format_schema).alias("value")).show(truncate=False)
+------------------+
|value |
+------------------+
|{{2, Alice}} |
+------------------+