Important
此功能目前以公共预览版提供。
本页介绍 Lakeflow Designer 中用户定义的运算符的 YAML 配置。 所有运算符类型(uc-udf和uc-udtfpython-run-function)都使用user-defined-operator-v0.1.0架构,该架构使用 JSON 架构格式定义配置字段。
有关如何生成用户定义的运算符的信息,请参阅 Lakeflow Designer 中的用户定义的运算符。
根属性
每个运算符 YAML 文件都以一组根属性开头,用于标识运算符并定义其行为。 以下示例显示了一般结构:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: My Operator
id: my_operator
version: '1.0.0'
description: >
What this operator does.
Can be multiple lines.
config:
type: object
properties:
my_field:
type: string
title: My Field
description: Help text
ports:
input:
- name: data
title: Input Data
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
return {"out": inputs["data"]}
environment:
environment_version: '4'
dependencies:
- 'pandas>=2.0'
| 财产 | 类型 | 必需 | Description |
|---|---|---|---|
schema |
字符串 | Yes | 架构标识符。 必须是 user-defined-operator-v0.1.0。 |
type |
字符串 | Yes | 运算符的类型: uc-udf、 uc-udtf或 python-run-function。 |
name |
字符串 | Yes | 运算符的显示名称。 保持短,以适应 Lakeflow 设计器 UI。 最小长度为 1 个字符。 |
id |
字符串 | Yes | 运算符类型的唯一标识符。 最小长度为 1 个字符。 请考虑使用命名空间(例如 finance. 或 ml.)对运算符进行分类。 |
description |
字符串 | Yes | 有关运算符执行的操作的详细说明。 在 UI 中向用户显示。 将 YAML 多行语法 (>) 用于较长的说明。 |
config |
对象 | Yes | 定义配置字段的 JSON 架构对象。 请参阅 “配置”。 |
ports |
对象 | 否 | 输入和输出端口定义。 请参阅端口。 |
version |
字符串 | Yes | 版本字符串(例如, "1.0.0")。 使用此函数跟踪自己的操作员版本。 |
run_function |
对象 | 否 |
python-run-function 运算符的内联Python代码。 请参阅 run_function。 |
environment |
对象 | 否 | Python环境配置,包括依赖项。 请参阅 environment。 |
Ports
端口定义操作员如何连接到管道中的其他运算符。 对象 ports 包含 input 和 output 数组。
ports:
input:
- name: input_data
title: Input Data
mime: application/vnd.databricks.dataframe
allowMultiple: true
required: true
output:
- name: out
title: Output
| 财产 | 类型 | 必需 | Description |
|---|---|---|---|
name |
字符串 | Yes | 端口的唯一标识符。 在连接和配置引用中使用。 |
title |
字符串 | 否 | UI 中显示的人工可读标签。 |
mime |
字符串 | 否 | 端口数据的 MIME 类型。 例如,application/vnd.databricks.dataframe。 |
allowMultiple |
布尔 | 否 | 如果 true,端口接受多个传入连接。 |
required |
布尔 | 否 | 如果 false,则端口是可选的。 默认值:true。 |
仅接受记录的端口属性。 架构验证拒绝未知密钥(如旧 label 字段)。
端口示例
具有输入和输出端口的 UDF:
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
具有输入和输出端口的 UDTF:
ports:
input:
- name: input_data
title: Input Data
output:
- name: clustered_data
title: Clustered Results
具有多个输入和可选端口的 python-run-function:
ports:
input:
- name: main_data
title: Main Data
- name: reference_data
title: Reference Table
required: false
output:
- name: joined_output
title: Joined Output
配置
该 config 字段是 JSON 架构对象。 将每个配置字段定义为架构中的属性。 此格式允许你访问标准 JSON 架构验证功能,例如enum,minimummaximum和examples。
对象 config 必须具有 type: object 和 properties 映射。 可以选择包含 required (所需属性名称的数组)和 additionalProperties。
config:
type: object
properties:
cluster_count:
type: number
title: Number of Clusters
description: How many clusters to create
default: 3
minimum: 1
maximum: 100
algorithm:
type: string
title: Algorithm
description: Clustering algorithm to use
enum: ['kmeans', 'dbscan', 'hierarchical']
default: kmeans
feature_col:
type: string
title: Feature Column
description: Column to use as input
format: expression
x-ui:
widget: expression
port: data
required: [cluster_count, feature_col]
additionalProperties: false
配置属性字段
对象中的每个 config.properties 属性都支持以下标准 JSON 架构字段:
| 领域 | 类型 | Description |
|---|---|---|
type |
字符串 | 数据类型:string、、number、integerboolean、或arrayobject。 |
title |
字符串 | UI 中显示的人工可读标签。 |
description |
字符串 | 向用户显示的帮助文本。 |
default |
任意 | 字段的默认值。 |
examples |
数组 | 字段的示例值。 |
enum |
数组 | 修复了允许值的列表。 |
format |
字符串 | 语义类型提示。 请参阅 “格式”值。 |
minimum |
数字 | 允许的最小值(对于 number 和 integer 类型)。 |
maximum |
数字 | 允许的最大值(对于 number 和 integer 类型)。 |
items |
对象 | 数组元素的架构(何时typearray为 )。 |
properties |
对象 | 嵌套属性定义(何时 type 为 object)。 |
required |
数组 | 所需的嵌套属性名称列表(何时 type 为 object)。 |
其他标准 JSON 架构字段(例如minLength,maxLengthpattern以及const)也受支持。
设置值格式
format配置属性上的字段提供语义类型提示,告知 Lakeflow Designer 如何解释值。 这些提示支持专用 UI 行为和验证。
| Format | Description |
|---|---|
expression |
列引用或 SQL 表达式。 |
table_source |
表源引用。 |
file_source |
文件源引用。 |
column_expressions |
列表达式。 |
sort_expressions |
对表达式进行排序。 |
aggregation_expressions |
聚合表达式。 |
ai_function_expressions |
AI 函数表达式。 |
is_preview |
自动预览模式标志。 Lakeflow Designer 在工作流预览期间将此设置设置为 true 。 配置属性名称是任意的 , 只有 format: is_preview 标记很重要。 使用此函数可以跳过预览期间外部 API 调用等副作用。 |
string[] |
字符串数组。 |
UI 小组件
小组件自定义配置字段在 Lakeflow 设计器界面中的呈现方式。 在每个配置属性的属性 x-ui 中定义小组件。 如果省略小组件,Lakeflow Designer 会基于数据类型使用默认小组件。
| Widget | 数据类型 | Description |
|---|---|---|
input |
字符串 | 单行文本输入。 |
textarea |
字符串 | 多行文本区域。 支持可选 rows 属性。 |
checkbox |
布尔 | 标准复选框。 |
toggle |
布尔 | 切换开关。 |
number |
number/integer | 具有可选约束的数字输入。 |
slider |
number/integer | 数值范围的可视滑块。 支持可选 step 属性。 |
select |
字符串 | 单选下拉列表。 需要 optionsSource。 |
multi-select |
数组 | 多选下拉列表。 需要 optionsSource。 |
expression |
字符串 | 列/表达式选择器。 需要 port。 |
input
单行文本输入字段。
api_endpoint:
type: string
title: API Endpoint
x-ui:
widget: input
textarea
多行文本区域用于较长的内容。 支持用于控制高度的可选 rows 属性。
message_body:
type: string
title: Message Body
x-ui:
widget: textarea
rows: 4
checkbox
布尔值的标准复选框。
send_notification:
type: boolean
title: Send Notification
default: false
x-ui:
widget: checkbox
toggle
布尔值的切换开关。
enable_logging:
type: boolean
title: Enable Logging
default: true
x-ui:
widget: toggle
number
数字输入字段。 使用 minimum 属性本身来 maximum 约束范围。
num_clusters:
type: number
title: Number of Clusters
default: 3
minimum: 1
maximum: 100
x-ui:
widget: number
slider
用于在范围内选择数值的视觉滑块。 使用 minimum 属性并 maximum 设置范围,并 step 控制 x-ui 增量。
confidence_threshold:
type: number
title: Confidence Threshold
default: 0.8
minimum: 0
maximum: 1
x-ui:
widget: slider
step: 0.05
select
单选下拉列表。 需要一个 optionsSource 定义下拉列表值来自何处。 请参阅 选项源。
aggregation_type:
type: string
title: Aggregation Type
x-ui:
widget: select
optionsSource:
type: static
values: ['sum', 'avg', 'min', 'max', 'count']
multi-select
用于选择多个值的多选下拉列表。 与 type: array 属性一起使用 items: { type: string } 。 需要一个 optionsSource。 请参阅 选项源。
feature_columns:
type: array
title: Feature Columns
items:
type: string
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: input_data
expression
允许用户从输入数据中选择列或编写自定义 SQL 表达式的列/表达式选择器。 在属性上设置format: expression并指定输入port。x-ui 这非常有用:
- 当用户应从输入数据中选择列时。
- 用户可能想要编写自定义 SQL 表达式时。
- 对于引用管道中的动态数据的参数。
amount:
type: string
title: Amount
format: expression
x-ui:
widget: expression
port: input_data
选项源
对于 select 小 multi-select 组件,必须定义下拉列表选项的使用 optionsSource位置。
静态选项
YAML 中定义的值的固定列表。
optionsSource:
type: static
values: ['option1', 'option2', 'option3']
| 财产 | 类型 | 必需 | Description |
|---|---|---|---|
type |
字符串 | Yes | 必须是 static。 |
values |
数组 | Yes | 下拉列表的字符串值的数组。 |
输入列
使用输入端口中的列名称动态填充下拉列表。
optionsSource:
type: inputColumns
port: input_data
| 财产 | 类型 | 必需 | Description |
|---|---|---|---|
type |
字符串 | Yes | 必须是 inputColumns。 |
port |
字符串 | Yes | 要从中获取列名称的输入端口的名称。 必须与其中一个定义的输入端口匹配 name 。 |
run_function
使用 run_function 属性,可以直接将Python代码嵌入 python-run-function 运算符的 YAML 配置中。 这样就无需注册单独的 Unity 目录函数。
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["data"]
threshold = config["threshold"]
return {"out": df.filter(df["score"] > threshold)}
| 财产 | 类型 | 必需 | Description |
|---|---|---|---|
type |
字符串 | Yes | 必须是 inline。 |
code |
字符串 | Yes | Python源代码。 必须定义函数 run() 。 |
该 run() 函数接收三个参数:
-
config:UI 中用户设置的配置值的字典。 -
inputs:将输入端口名称映射到 DataFrame 的字典。 -
spark:当前活动的 SparkSession。
该函数必须返回一个字典,将输出端口名称映射到 DataFrames。 键必须与在中name定义的每个输出端口的字段完全匹配ports.output。 例如,输出端口名为 out:
return {"out": result_df}
具有多个输出端口:
return {"match": match_df, "rest": rest_df}
environment
environment 属性指定 python-run-function 运算符的Python环境。 使用它固定环境版本并声明 pip 依赖项。
environment:
environment_version: '4'
dependencies:
- 'scikit-learn>=1.3'
- 'pandas>=2.0'
| 财产 | 类型 | 必需 | Description |
|---|---|---|---|
environment_version |
字符串 | 否 | 要使用的环境版本。 例如,"4"。 |
dependencies |
字符串数组 | 否 | pip 依赖项说明符的列表。 每个条目都遵循标准 pip 语法(例如 "pandas>=2.0", )。 |
完整示例
基于 UC 的 UDF
此示例定义一个基于 Unity 目录的 UDF 运算符,用于计算复合兴趣。
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Compound Interest
id: finance.compound_interest
version: '1.0.0'
description: >
Calculates compound interest based on principal, rate, and time period.
config:
type: object
properties:
principal:
type: string
title: Principal Amount
format: expression
x-ui:
widget: expression
port: input_data
annual_rate:
type: number
title: Annual Interest Rate
default: 5.0
minimum: 0
maximum: 100
x-ui:
widget: number
years:
type: number
title: Number of Years
default: 10
minimum: 1
maximum: 50
x-ui:
widget: slider
step: 1
compound_frequency:
type: string
title: Compounding Frequency
default: 'monthly'
x-ui:
widget: select
optionsSource:
type: static
values: ['daily', 'monthly', 'quarterly', 'annually']
required: [principal, annual_rate]
additionalProperties: false
ports:
input:
- name: input_data
title: Input Data
output:
- name: out
title: Output
Python run-function 运算符
此示例定义使用 python-run-function K-Means 聚类分析对客户进行细分的操作员。
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Customer Segmentation
id: ml.customer_segmentation
version: '1.2.0'
description: >
Segments customers into groups based on selected features
using K-Means clustering. Returns customer IDs with their
assigned segment numbers.
config:
type: object
properties:
num_segments:
type: integer
title: Number of Segments
description: How many customer segments to create
default: 3
minimum: 2
maximum: 20
x-ui:
widget: number
customer_id_column:
type: string
title: Customer ID Column
description: Column containing customer identifiers
x-ui:
widget: select
optionsSource:
type: inputColumns
port: customer_data
feature_columns:
type: array
title: Feature Columns
description: Columns to use for segmentation
items:
type: string
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: customer_data
normalize_features:
type: boolean
title: Normalize Features
description: Whether to normalize feature values before clustering
default: true
x-ui:
widget: toggle
required: [num_segments, customer_id_column, feature_columns]
additionalProperties: false
ports:
input:
- name: customer_data
title: Customer Data
mime: application/vnd.databricks.dataframe
output:
- name: segmented_customers
title: Segmented Customers
run_function:
type: inline
code: |
def run(config, inputs, spark):
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
df = inputs["customer_data"]
id_col = config["customer_id_column"]
features = config["feature_columns"]
k = config["num_segments"]
normalize = config.get("normalize_features", True)
assembler = VectorAssembler(inputCols=features, outputCol="features_vec")
assembled = assembler.transform(df)
if normalize:
scaler = StandardScaler(inputCol="features_vec", outputCol="scaled_features")
model = scaler.fit(assembled)
assembled = model.transform(assembled)
feature_col = "scaled_features"
else:
feature_col = "features_vec"
kmeans = KMeans(k=k, featuresCol=feature_col, predictionCol="segment")
result = kmeans.fit(assembled).transform(assembled)
return {"segmented_customers": result.select(id_col, "segment")}
environment:
environment_version: '4'
dependencies:
- 'scikit-learn>=1.3'
快速参考
所需的根属性
-
schema:user-defined-operator-v0.1.0 -
name:显示名称 -
id:唯一标识符 -
description:运算符的作用 -
config:JSON 架构对象 -
type:uc-udf、uc-udtf或python-run-function -
version:作者定义的版本字符串
可选根属性
-
ports:输入和输出端口定义 -
run_function:内联Python代码(仅限python-run-function) -
environment:仅Python环境和依赖项(仅限python-run-function)
配置属性数据类型
string | boolean | number | integer | array | object
UI 小组件
input | textarea | checkbox | toggle | number | slider | select | multi-select | expression
选项源
static (固定值) | inputColumns (来自输入端口)
设置值格式
expression | table_source | file_source | column_expressions | sort_expressions | aggregation_expressions | ai_function_expressions | is_preview | string[]