用户定义的函数 - PythonUser-defined functions - Python

本文包含 Python 用户定义函数 (UDF) 示例。This article contains Python user-defined function (UDF) examples. 其中介绍了如何注册 UDF、如何调用 UDF 以及有关 Spark SQL 中子表达式计算顺序的注意事项。It shows how to register UDFs, how to invoke UDFs, and caveats regarding evaluation order of subexpressions in Spark SQL.

将函数注册为 UDFRegister a function as a UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

可以选择设置 UDF 的返回类型。You can optionally set the return type of your UDF. 默认返回类型为 StringTypeThe default return type is StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

在 Spark SQL 中调用 UDFCall the UDF in Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

将 UDF 与数据帧配合使用Use UDF with DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

另外,还可以使用注释语法声明同一 UDF:Alternatively, you can declare the same UDF using annotation syntax:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

计算顺序和 NULL 检查Evaluation order and null checking

Spark SQL(包括 SQL、数据帧和数据集 API)不保证子表达式的计算顺序。Spark SQL (including SQL and the DataFrame and Dataset API) does not guarantee the order of evaluation of subexpressions. 具体而言,运算符或函数的输入不一定按从左到右的顺序或任何其他固定顺序进行计算。In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order. 例如,逻辑 ANDOR 表达式没有从左到右的“短路”语义。For example, logical AND and OR expressions do not have left-to-right “short-circuiting” semantics.

因此,依赖于布尔表达式计算的副作用或顺序以及 WHEREHAVING 子句的顺序是危险的,因为在查询优化和规划过程中,这些表达式和子句可能重新排序。Therefore, it is dangerous to rely on the side effects or order of evaluation of Boolean expressions, and the order of WHERE and HAVING clauses, since such expressions and clauses can be reordered during query optimization and planning. 具体而言,如果 UDF 依赖于 SQL 中的短路语义进行 NULL 检查,则不能保证在调用 UDF 之前执行 NULL 检查。Specifically, if a UDF relies on short-circuiting semantics in SQL for null checking, there’s no guarantee that the null check will happen before invoking the UDF. 例如,For example,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

这个 WHERE 子句并不保证在筛选掉 NULL 后调用 strlen UDF。This WHERE clause does not guarantee the strlen UDF to be invoked after filtering out nulls.

若要执行正确的 NULL 检查,建议执行以下操作之一:To perform proper null checking, we recommend that you do either of the following:

  • 使 UDF 自身能够识别 NULL,在 UDF 自身内部进行 NULL 检查Make the UDF itself null-aware and do null checking inside the UDF itself
  • 使用 IFCASE WHEN 表达式来执行 NULL 检查并在条件分支中调用 UDFUse IF or CASE WHEN expressions to do the null check and invoke the UDF in a conditional branch
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok