什么是 Python 用户定义的表函数?

重要

此功能目前以公共预览版提供。

用户定义的表函数 (UDTF) 允许注册返回表而不是标量值的函数。 在 SQL 查询中引用时,UDTF 与常见表表达式 (CTE) 类似。 在 SQL 语句的 FROM 子句中引用 UDTF,可以将其他 Spark SQL 运算符链接到结果。

UDTF 将注册到本地 SparkSession,并在笔记本或作业级别隔离。

在配置了已分配或无隔离共享访问模式的计算上支持 UDTF。 不能在共享访问模式下使用 UDTF。

不能将 UDTF 注册为 Unity Catalog 中的对象,UDDF 不能用于 SQL 仓库。

UDTF 的基本语法是什么?

Apache Spark 使用必需的 eval 方法将 Python UDDF 实现为 Python 类。

使用 yield 以行的形式发出结果。

要使 Apache Spark 将类用作 UDTF,必须导入 PySpark udtf 函数。

Databricks 建议将此函数用作修饰器,并且始终使用 returnType 选项显式指定字段名称和类型。

以下示例使用 UDTF 从标量输入创建一个简单的表:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

可以使用 Python *args 语法并实现逻辑来处理未指定数量的输入值。 以下示例会返回相同的结果,同时显式检查参数的输入长度和类型:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

注册 UDTF

可以使用以下语法将 UDTF 注册到当前 SparkSession,以便在 SQL 查询中使用:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

以下示例将 Python UDTF 注册到 SQL:

spark.udtf.register("simple_udtf", SimpleUDTF)

注册后,可以使用 %sql magic 命令或 spark.sql() 函数在 SQL 中使用 UDTF,如以下示例所示:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

生成结果

Python UDTF 通过 yield 实现,以返回结果。 结果始终作为包含具有指定架构的 0 行或更多行的表返回。

传递标量参数时,eval 方法中的逻辑在传递标量参数集的情况下完全运行一次。 对于表参数,eval 方法针对输入表中的每一行运行一次。

可以写入逻辑以针对每个输入返回 0、1 或多行。

以下 UDTF 演示了通过将逗号分隔列表中的项目分隔为单独的条目,为每个输入返回 0 行或更多行:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

将表参数传递给 UDTF

可以使用 SQL 关键字 TABLE() 将表参数传递给 UDTF。 可以使用表名或查询,如以下示例所示:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

表参数一次处理一行。 可以使用标准 PySpark 列字段注释与每行中的列进行交互。 以下示例演示如何显式导入 PySpark Row 类型,然后在 id 字段中筛选传递的表:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

将标量参数传递给 UDTF

可以使用以下值的任意组合将标量参数传递给 UDTF:

  • 标量常数
  • 标量函数
  • 关系中的字段

若要在关系中传递字段,必须注册 UDTF 并使用 SQL LATERAL 关键字。

注意

可以使用内联表别名消除列的歧义。

以下示例演示如何使用 LATERAL 将字段从表传递到 UDTF:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

设置 UDDF 的默认值

可以选择性地实现 __init__ 方法来设置可在 Python 逻辑中引用的类变量的默认值。

__init__ 方法不接受任何参数,并且无法访问 SparkSession 中的变量或状态信息。

将 Apache Arrow 与 UDDF 配合使用

Databricks 建议对 UDTF 使用 Apache Arrow,这些 UDTF 接收少量数据作为输入,但输出大型表。

可以通过在声明 UDTF 时指定 useArrow 参数来启用 Arrow,如以下示例所示:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1