Lakeflow Designer 中的用户定义的运算符

Important

此功能目前以公共预览版提供。

使用 Lakeflow Designer,可以创建直接显示在画布中的 用户定义的运算符 以及内置运算符。 使用它们借助您自己的业务逻辑、计算逻辑或集成来扩展 Lakeflow Designer。

有三种类型的用户定义的运算符:

  • python-run-function:存储在工作区中的一个独立 YAML 文件,包含内联 Python。 最适合用于 DataFrame 层级转换和外部集成。 权限在工作区文件层级进行管理。
  • uc-udf:封装 Unity Catalog 标量函数。 最适合进行列级转换。 访问受 Unity 目录权限的约束。
  • uc-udtf:封装 Unity Catalog 表值函数。 最适合表级转换,例如 ML 聚类分析和聚合。 访问受 Unity 目录权限的约束。
功能 python-run-function uc-udf uc-udtf
示例用例 数据帧转换、API 集成、电子邮件通知 列级计算 (BMI, 利率) ML 聚类分析,跨行聚合
Input 数据帧 单个值 整个表,逐行排列
Output 数据帧 单个值 表(多行)
需要 Unity 目录函数 是的 是的
访问治理 工作区文件权限 Unity Catalog 权限(EXECUTEUSE SCHEMA Unity Catalog 权限(EXECUTEUSE SCHEMA
支持的语言 仅 Python SQL 封装器中的 SQL 或 Python SQL 封装器中的 SQL 或 Python

用户定义的运算符的工作原理是什么?

用户定义的运算符包括:

  • 运算符逻辑:执行运算符时运行的代码。 可以是内联 Python run() 函数(适用于 python-run-function)或 Unity 目录函数(适用于 uc-udfuc-udtf)。
  • YAML 配置:告诉 Lakeflow Designer 如何在 UI 中显示运算符,包括操作员的名称、说明、输入参数、UI 小组件和端口。 所有运算符类型都使用 user-defined-operator-v0.1.0 架构。
  • 注册文件:允许 Lakeflow Designer 发现操作员的条目 .user_defined_operators.yaml

运算符逻辑

Python 运行函数中的用户定义运算符逻辑

每个 python-run-function 运算符都必须定义一个 run() 函数:

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config:UI 中用户配置的值,按属性名称键键。
  • inputs:输入 DataFrame,以输入端口 name为键。
  • spark:当前活动的 SparkSession。
  • 返回:字典将输出端口 name 值映射到 DataFrames。

以下示例筛选输入数据帧中的行:

def run(config, inputs, spark):
    df = inputs["in"]
    filtered = df.filter(config["filter_expression"])
    return {"out": filtered}

如果操作员需要外部 pip 包,请将 environment 字段添加到 YAML:

environment:
  environment_version: '1'
  dependencies:
    - requests==2.31.0
    - beautifulsoup4==4.12.0

UDF 和 UDTF 运算符逻辑

可以在 SQL 或Python中编写 UC 函数。 Python函数包装在 SQL CREATE FUNCTION 语句中:

SQL 函数:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
  SELECT weight_kg / (height_m * height_m);

Python 函数(包装在 SQL 中):

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  return weight_kg / (height_m ** 2)
$$;

UDF 一次处理单个值并返回计算值。 UDTF 逐行处理表中的数据,并可在所有行之间保持状态。 使用 uc-udf 进行列级转换,使用 uc-udtf 进行机器学习聚类或聚合等操作。

此外,UDTF 要求你定义三个关键方法:__init__()eval()terminate()

class MyOperator:
    def __init__(self):
        # Called before processing - initialize any values needed.

    def eval(self, row, id_column, columns, k):
        # Called once per input row - accumulate data here.

    def terminate(self):
        # Called after all rows - perform final calculations and yield results.

注释

UDTF 返回表必须具有固定的显式类型。 不能在返回配置中引用输入列类型。

YAML 配置

YAML 配置告知 Lakeflow Designer 如何在 UI 中显示运算符。 它定义操作员的名称、说明、输入参数、UI 小组件和端口。 每个配置字段都是具有类型、标题和可选 x-ui 微件提示的属性:

config:
  type: object
  properties:
    my_param:
      type: string
      title: My Parameter
      x-ui:
        widget: input
    my_expression:
      type: string
      title: Column
      format: expression
      x-ui:
        widget: expression
        port: in
    my_number:
      type: number
      title: Count
      default: 10
      minimum: 0
      maximum: 100
  required:
    - my_param
    - my_expression

有关 YAML 架构的完整详细信息,包括所有小组件类型和配置选项,请参阅 用户定义的运算符 YAML 参考

Ports

端口定义操作员的输入和输出:

ports:
  input:
    - name: in
      title: Input Data
      mime: application/vnd.databricks.dataframe
      required: true
      allowMultiple: false
  output:
    - name: out
      title: Output Data

用于Python运行函数运算符的 YAML

对于 python-run-function 运算符,YAML 文件是独立的,包含内联Python代码的 run_function 字段:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
  type: object
  properties:
    filter_expression:
      type: string
      title: Filter Expression
      x-ui:
        widget: input
  required:
    - filter_expression
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        df = inputs["in"]
        filtered = df.filter(config["filter_expression"])
        return {"out": filtered}

用于 Unity Catalog 函数的 YAML

对于基于 UC 的运算符,请将 YAML 配置作为注释或 docstring 嵌入函数中。

在 SQL 中 (使用 /* ... */ 注释):

RETURN(/*
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
    */
  SELECT weight_kg / (height_m * height_m)
);

In Python(use """ ... """ docstring):

AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
  """

  return weight_kg / (height_m ** 2)
$$;

将操作员注册并部署到 Lakeflow Designer

若要使操作器显示在 Lakeflow Designer 中,请在 .user_defined_operators.yaml 文件中注册它:

  • 工作区级别: 将文件放在工作区的根目录中,使操作员对所有用户可见。
  • 用户级别: 将文件放在用户主文件夹 (/Workspace/Users/<user-name>/.user_defined_operators.yaml) 中,使操作员仅对你可见。

operators: 部分支持文件路径、Unity 目录函数引用和 glob 模式。 可以混合输入类型:

operators:
  # File path (python-run-function operators)
  - /Workspace/Users/me/udos/my_operator.yaml
  # Glob pattern (registers all matching files)
  - /Workspace/Users/me/udos/transforms/*.yaml
  # UC function reference (uc-udf and uc-udtf operators)
  - catalog: my_catalog
    schema: my_schema
    functionName: my_function

高级配置

预览模式

在设计模式下,Lakeflow Designer 支持预览。 对于调用外部 API 或写入外部系统的运算符,请添加配置 is_preview 属性,以便在预览期间跳过副作用。 启用预览模式后,用户需要显式单击“ 运行 ”以执行具有副作用的运算符。

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false

Lakeflow Designer 会在预览期间自动将此值设置为 true 。 在逻辑中进行判断以跳过副作用:

# In a python-run-function
if config.get("is_preview"):
    return {"out": inputs["in"]}

# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END

Unity Catalog 连接

对于调用外部 API 的基于 Unity Catalog 的 SQL 操作符,请使用 Unity Catalog HTTP 连接来安全存储凭据:

CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
  host 'https://api.example.com',
  port '443',
  base_path '/v1/',
  bearer_token 'your-token-here'
);

然后在 SQL UDF 中使用该连接和 http_request() 函数。 有关详细信息,请参阅 “连接到外部 HTTP 服务”。

WorkspaceClient

对于 python-run-function 运算符,可以使用 Azure Databricks WorkspaceClient 访问工作区资源和外部 API:

def run(config, inputs, spark):
    from databricks.sdk import WorkspaceClient
    w = WorkspaceClient()
    # Use w to access workspace resources

创建完整的 python-run-function 用户定义运算符

以下步骤逐步讲解如何从头开始创建 python-run-function 操作员。

步骤 1:定义逻辑

在笔记本中编写您的 run() 函数:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    from pyspark.sql import functions as F
    df = inputs["in"]
    result = df.withColumn(config["column_name"], F.current_timestamp())
    return {"out": result}

步骤 2:测试函数

使用示例数据以交互方式测试函数:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

result = run(
    config={"column_name": "processed_at"},
    inputs={"in": test_df},
    spark=spark
)

result["out"].show()

步骤 3:创建 YAML 配置

定义 YAML 文件中的操作员元数据、配置字段和端口:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name

步骤 4:合并逻辑和 YAML

添加 run_functionports 字段,以创建完整的 YAML 文件。 将其保存到工作区,例如 /Workspace/Users/<user-name>/udos/add_timestamp.yaml

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        from pyspark.sql import functions as F
        df = inputs["in"]
        result = df.withColumn(config["column_name"], F.current_timestamp())
        return {"out": result}

步骤 5:注册操作员

将文件路径添加到 .user_defined_operators.yaml 文件:

operators:
  - /Workspace/Users/<user-name>/udos/add_timestamp.yaml

步骤 6:在 Lakeflow Designer 中使用运算符

打开 Lakeflow 设计器并验证运算符是否显示在运算符面板中。 将其拖到画布上、连接输入、配置列名并运行预览。

创建完整的 UC 用户定义运算符

以下步骤将介绍如何创建基于 UC 的 uc-udf 运算符。

步骤 1:定义逻辑

在笔记本中编写和测试函数逻辑:

def double_value(input_value: float) -> float:
    if input_value is None:
        return None
    return input_value * 2

步骤 2:创建 YAML 配置

定义操作员元数据、配置字段和端口:

schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
  type: object
  properties:
    input_value:
      type: string
      title: Input Value
      format: expression
      x-ui:
        widget: expression
        port: input_data
  required:
    - input_value
ports:
  input:
    - name: input_data
      title: Input
  output:
    - name: out
      title: Output

步骤 3:合并逻辑和 YAML

创建 Unity Catalog 函数,并将 YAML 以内嵌文档字符串(docstring)的形式包含其中:

CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Double Value
  id: math.double_value
  version: "1.0.0"
  description: Doubles the input value
  config:
    type: object
    properties:
      input_value:
        type: string
        title: Input Value
        format: expression
        x-ui:
          widget: expression
          port: input_data
    required:
      - input_value
  ports:
    input:
      - name: input_data
        title: Input
    output:
      - name: out
        title: Output
  """

  def double_value(input_value: float) -> float:
      if input_value is None:
          return None
      return input_value * 2

  return double_value(input_value)
$$

步骤 4:测试函数

SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10

步骤 5:注册操作员

将 Unity 目录函数引用添加到 .user_defined_operators.yaml 文件:

operators:
  - catalog: main
    schema: my_schema
    functionName: double_value

步骤 6:在 Lakeflow Designer 中使用运算符

打开 Lakeflow 设计器并验证运算符是否显示在运算符面板中。 将其拖到画布上,连接输入并运行预览。

故障排除

问题 解决方案
运算符不会出现在 Lakeflow Designer 中。 检查是否存在 .user_defined_operators.yaml,并确认其中列出了你的函数或文件路径。 对于 python-run-function 运算符,请验证文件路径以及 YAML 文件是否可访问。
架构验证失败。 对照 https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json 中的官方架构定义校验您的 YAML。
权限被拒绝。” 对于基于 UC 的操作员,请验证用户是否对该函数具有 EXECUTE 权限,并对该架构具有 USE SCHEMA 权限。 对于 python-run-function 操作员,请验证用户对 YAML 文件的读取访问权限。
python-run-function 运算符在运行时失败。 检查函数签名是否 run() 匹配 def run(config, inputs, spark)。 验证代码中的端口名称是否与 YAML 匹配,并且返回字典键是否与输出端口 name 值匹配。
UDTF 返回错误类型。 UDTF 返回类型必须是显式的 — 不能引用输入列类型。

Permissions

许可 Purpose
.user_defined_operators.yaml 了解运算符。
对 YAML 文件的读取权限(仅限 python-run-function)。 加载运算符定义。
对 Unity Catalog 函数执行EXECUTE(仅限基于 UC 的操作器)。 运行操作器。
仅在架构上使用 SCHEMA(仅基于 UC 的运算符)。 访问在其中创建函数的架构。
其他权限 根据操作员的不同,用户可能需要其他权限。 例如,在 Unity Catalog 连接上使用 USE CONNECTION 进行 HTTP API 调用。

后续步骤

浏览以下教程:

Example 类型 Description
Gmail 电子邮件发件人 python-run-function 通过 Gmail 以 CSV 电子邮件附件的形式发送 DataFrame 数据。
复利计算器 uc-udf 使用复利公式计算未来的投资值。
K-Means 群集 uc-udtf 使用 scikit-learn 将数据细分为群集。
所有用户界面组件 uc-udf 用于展示所有可用 UI 组件的参考运算符。

有关对 YAML 架构的完整引用,请参阅 用户定义的运算符 YAML 参考