Important
此功能目前以公共预览版提供。
使用 Lakeflow Designer,可以创建直接显示在画布中的 用户定义的运算符 以及内置运算符。 使用它们借助您自己的业务逻辑、计算逻辑或集成来扩展 Lakeflow Designer。
有三种类型的用户定义的运算符:
-
python-run-function:存储在工作区中的一个独立 YAML 文件,包含内联 Python。 最适合用于 DataFrame 层级转换和外部集成。 权限在工作区文件层级进行管理。 -
uc-udf:封装 Unity Catalog 标量函数。 最适合进行列级转换。 访问受 Unity 目录权限的约束。 -
uc-udtf:封装 Unity Catalog 表值函数。 最适合表级转换,例如 ML 聚类分析和聚合。 访问受 Unity 目录权限的约束。
| 功能 | python-run-function |
uc-udf |
uc-udtf |
|---|---|---|---|
| 示例用例 | 数据帧转换、API 集成、电子邮件通知 | 列级计算 (BMI, 利率) | ML 聚类分析,跨行聚合 |
| Input | 数据帧 | 单个值 | 整个表,逐行排列 |
| Output | 数据帧 | 单个值 | 表(多行) |
| 需要 Unity 目录函数 | 否 | 是的 | 是的 |
| 访问治理 | 工作区文件权限 | Unity Catalog 权限(EXECUTE、USE SCHEMA) |
Unity Catalog 权限(EXECUTE、USE SCHEMA) |
| 支持的语言 | 仅 Python | SQL 封装器中的 SQL 或 Python | SQL 封装器中的 SQL 或 Python |
用户定义的运算符的工作原理是什么?
用户定义的运算符包括:
-
运算符逻辑:执行运算符时运行的代码。 可以是内联 Python
run()函数(适用于python-run-function)或 Unity 目录函数(适用于uc-udf和uc-udtf)。 -
YAML 配置:告诉 Lakeflow Designer 如何在 UI 中显示运算符,包括操作员的名称、说明、输入参数、UI 小组件和端口。 所有运算符类型都使用
user-defined-operator-v0.1.0架构。 -
注册文件:允许 Lakeflow Designer 发现操作员的条目
.user_defined_operators.yaml。
运算符逻辑
Python 运行函数中的用户定义运算符逻辑
每个 python-run-function 运算符都必须定义一个 run() 函数:
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config:UI 中用户配置的值,按属性名称键键。 -
inputs:输入 DataFrame,以输入端口name为键。 -
spark:当前活动的 SparkSession。 -
返回:字典将输出端口
name值映射到 DataFrames。
以下示例筛选输入数据帧中的行:
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
如果操作员需要外部 pip 包,请将 environment 字段添加到 YAML:
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0
UDF 和 UDTF 运算符逻辑
可以在 SQL 或Python中编写 UC 函数。 Python函数包装在 SQL CREATE FUNCTION 语句中:
SQL 函数:
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);
Python 函数(包装在 SQL 中):
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;
UDF 一次处理单个值并返回计算值。 UDTF 逐行处理表中的数据,并可在所有行之间保持状态。 使用 uc-udf 进行列级转换,使用 uc-udtf 进行机器学习聚类或聚合等操作。
此外,UDTF 要求你定义三个关键方法:__init__()、eval() 和 terminate():
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.
def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.
def terminate(self):
# Called after all rows - perform final calculations and yield results.
注释
UDTF 返回表必须具有固定的显式类型。 不能在返回配置中引用输入列类型。
YAML 配置
YAML 配置告知 Lakeflow Designer 如何在 UI 中显示运算符。 它定义操作员的名称、说明、输入参数、UI 小组件和端口。 每个配置字段都是具有类型、标题和可选 x-ui 微件提示的属性:
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression
有关 YAML 架构的完整详细信息,包括所有小组件类型和配置选项,请参阅 用户定义的运算符 YAML 参考。
Ports
端口定义操作员的输入和输出:
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data
用于Python运行函数运算符的 YAML
对于 python-run-function 运算符,YAML 文件是独立的,包含内联Python代码的 run_function 字段:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
用于 Unity Catalog 函数的 YAML
对于基于 UC 的运算符,请将 YAML 配置作为注释或 docstring 嵌入函数中。
在 SQL 中 (使用 /* ... */ 注释):
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);
In Python(use """ ... """ docstring):
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""
return weight_kg / (height_m ** 2)
$$;
将操作员注册并部署到 Lakeflow Designer
若要使操作器显示在 Lakeflow Designer 中,请在 .user_defined_operators.yaml 文件中注册它:
- 工作区级别: 将文件放在工作区的根目录中,使操作员对所有用户可见。
-
用户级别: 将文件放在用户主文件夹 (
/Workspace/Users/<user-name>/.user_defined_operators.yaml) 中,使操作员仅对你可见。
该 operators: 部分支持文件路径、Unity 目录函数引用和 glob 模式。 可以混合输入类型:
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function
高级配置
预览模式
在设计模式下,Lakeflow Designer 支持预览。 对于调用外部 API 或写入外部系统的运算符,请添加配置 is_preview 属性,以便在预览期间跳过副作用。 启用预览模式后,用户需要显式单击“ 运行 ”以执行具有副作用的运算符。
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
Lakeflow Designer 会在预览期间自动将此值设置为 true 。 在逻辑中进行判断以跳过副作用:
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}
# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END
Unity Catalog 连接
对于调用外部 API 的基于 Unity Catalog 的 SQL 操作符,请使用 Unity Catalog HTTP 连接来安全存储凭据:
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);
然后在 SQL UDF 中使用该连接和 http_request() 函数。 有关详细信息,请参阅 “连接到外部 HTTP 服务”。
WorkspaceClient
对于 python-run-function 运算符,可以使用 Azure Databricks WorkspaceClient 访问工作区资源和外部 API:
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources
创建完整的 python-run-function 用户定义运算符
以下步骤逐步讲解如何从头开始创建 python-run-function 操作员。
步骤 1:定义逻辑
在笔记本中编写您的 run() 函数:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
步骤 2:测试函数
使用示例数据以交互方式测试函数:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)
result["out"].show()
步骤 3:创建 YAML 配置
定义 YAML 文件中的操作员元数据、配置字段和端口:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
步骤 4:合并逻辑和 YAML
添加 run_function 和 ports 字段,以创建完整的 YAML 文件。 将其保存到工作区,例如 /Workspace/Users/<user-name>/udos/add_timestamp.yaml:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
步骤 5:注册操作员
将文件路径添加到 .user_defined_operators.yaml 文件:
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml
步骤 6:在 Lakeflow Designer 中使用运算符
打开 Lakeflow 设计器并验证运算符是否显示在运算符面板中。 将其拖到画布上、连接输入、配置列名并运行预览。
创建完整的 UC 用户定义运算符
以下步骤将介绍如何创建基于 UC 的 uc-udf 运算符。
步骤 1:定义逻辑
在笔记本中编写和测试函数逻辑:
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
步骤 2:创建 YAML 配置
定义操作员元数据、配置字段和端口:
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
步骤 3:合并逻辑和 YAML
创建 Unity Catalog 函数,并将 YAML 以内嵌文档字符串(docstring)的形式包含其中:
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
return double_value(input_value)
$$
步骤 4:测试函数
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10
步骤 5:注册操作员
将 Unity 目录函数引用添加到 .user_defined_operators.yaml 文件:
operators:
- catalog: main
schema: my_schema
functionName: double_value
步骤 6:在 Lakeflow Designer 中使用运算符
打开 Lakeflow 设计器并验证运算符是否显示在运算符面板中。 将其拖到画布上,连接输入并运行预览。
故障排除
| 问题 | 解决方案 |
|---|---|
| 运算符不会出现在 Lakeflow Designer 中。 | 检查是否存在 .user_defined_operators.yaml,并确认其中列出了你的函数或文件路径。 对于 python-run-function 运算符,请验证文件路径以及 YAML 文件是否可访问。 |
| 架构验证失败。 | 对照 https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json 中的官方架构定义校验您的 YAML。 |
| 权限被拒绝。” | 对于基于 UC 的操作员,请验证用户是否对该函数具有 EXECUTE 权限,并对该架构具有 USE SCHEMA 权限。 对于 python-run-function 操作员,请验证用户对 YAML 文件的读取访问权限。 |
python-run-function 运算符在运行时失败。 |
检查函数签名是否 run() 匹配 def run(config, inputs, spark)。 验证代码中的端口名称是否与 YAML 匹配,并且返回字典键是否与输出端口 name 值匹配。 |
| UDTF 返回错误类型。 | UDTF 返回类型必须是显式的 — 不能引用输入列类型。 |
Permissions
| 许可 | Purpose |
|---|---|
对 的.user_defined_operators.yaml。 |
了解运算符。 |
对 YAML 文件的读取权限(仅限 python-run-function)。 |
加载运算符定义。 |
| 对 Unity Catalog 函数执行EXECUTE(仅限基于 UC 的操作器)。 | 运行操作器。 |
| 仅在架构上使用 SCHEMA(仅基于 UC 的运算符)。 | 访问在其中创建函数的架构。 |
| 其他权限 | 根据操作员的不同,用户可能需要其他权限。 例如,在 Unity Catalog 连接上使用 USE CONNECTION 进行 HTTP API 调用。 |
后续步骤
浏览以下教程:
| Example | 类型 | Description |
|---|---|---|
| Gmail 电子邮件发件人 | python-run-function |
通过 Gmail 以 CSV 电子邮件附件的形式发送 DataFrame 数据。 |
| 复利计算器 | uc-udf |
使用复利公式计算未来的投资值。 |
| K-Means 群集 | uc-udtf |
使用 scikit-learn 将数据细分为群集。 |
| 所有用户界面组件 | uc-udf |
用于展示所有可用 UI 组件的参考运算符。 |
有关对 YAML 架构的完整引用,请参阅 用户定义的运算符 YAML 参考。