用户定义的运算符 YAML 参考

Important

此功能目前以公共预览版提供。

本页介绍 Lakeflow Designer 中用户定义的运算符的 YAML 配置。 所有运算符类型(uc-udfuc-udtfpython-run-function)都使用user-defined-operator-v0.1.0架构,该架构使用 JSON 架构格式定义配置字段。

有关如何生成用户定义的运算符的信息,请参阅 Lakeflow Designer 中的用户定义的运算符

根属性

每个运算符 YAML 文件都以一组根属性开头,用于标识运算符并定义其行为。 以下示例显示了一般结构:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: My Operator
id: my_operator
version: '1.0.0'
description: >
  What this operator does.
  Can be multiple lines.
config:
  type: object
  properties:
    my_field:
      type: string
      title: My Field
      description: Help text
ports:
  input:
    - name: data
      title: Input Data
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        return {"out": inputs["data"]}
environment:
  environment_version: '4'
  dependencies:
    - 'pandas>=2.0'
财产 类型 必需 Description
schema 字符串 Yes 架构标识符。 必须是 user-defined-operator-v0.1.0
type 字符串 Yes 运算符的类型: uc-udfuc-udtfpython-run-function
name 字符串 Yes 运算符的显示名称。 保持短,以适应 Lakeflow 设计器 UI。 最小长度为 1 个字符。
id 字符串 Yes 运算符类型的唯一标识符。 最小长度为 1 个字符。 请考虑使用命名空间(例如 finance.ml.)对运算符进行分类。
description 字符串 Yes 有关运算符执行的操作的详细说明。 在 UI 中向用户显示。 将 YAML 多行语法 (>) 用于较长的说明。
config 对象 Yes 定义配置字段的 JSON 架构对象。 请参阅 “配置”。
ports 对象 输入和输出端口定义。 请参阅端口
version 字符串 Yes 版本字符串(例如, "1.0.0")。 使用此函数跟踪自己的操作员版本。
run_function 对象 python-run-function 运算符的内联Python代码。 请参阅 run_function
environment 对象 Python环境配置,包括依赖项。 请参阅 environment

Ports

端口定义操作员如何连接到管道中的其他运算符。 对象 ports 包含 inputoutput 数组。

ports:
  input:
    - name: input_data
      title: Input Data
      mime: application/vnd.databricks.dataframe
      allowMultiple: true
      required: true
  output:
    - name: out
      title: Output
财产 类型 必需 Description
name 字符串 Yes 端口的唯一标识符。 在连接和配置引用中使用。
title 字符串 UI 中显示的人工可读标签。
mime 字符串 端口数据的 MIME 类型。 例如,application/vnd.databricks.dataframe
allowMultiple 布尔 如果 true,端口接受多个传入连接。
required 布尔 如果 false,则端口是可选的。 默认值:true

仅接受记录的端口属性。 架构验证拒绝未知密钥(如旧 label 字段)。

端口示例

具有输入和输出端口的 UDF:

ports:
  input:
    - name: in
      title: Input Data
  output:
    - name: out
      title: Output

具有输入和输出端口的 UDTF:

ports:
  input:
    - name: input_data
      title: Input Data
  output:
    - name: clustered_data
      title: Clustered Results

具有多个输入和可选端口的 python-run-function:

ports:
  input:
    - name: main_data
      title: Main Data
    - name: reference_data
      title: Reference Table
      required: false
  output:
    - name: joined_output
      title: Joined Output

配置

config 字段是 JSON 架构对象。 将每个配置字段定义为架构中的属性。 此格式允许你访问标准 JSON 架构验证功能,例如enumminimummaximumexamples

对象 config 必须具有 type: objectproperties 映射。 可以选择包含 required (所需属性名称的数组)和 additionalProperties

config:
  type: object
  properties:
    cluster_count:
      type: number
      title: Number of Clusters
      description: How many clusters to create
      default: 3
      minimum: 1
      maximum: 100
    algorithm:
      type: string
      title: Algorithm
      description: Clustering algorithm to use
      enum: ['kmeans', 'dbscan', 'hierarchical']
      default: kmeans
    feature_col:
      type: string
      title: Feature Column
      description: Column to use as input
      format: expression
      x-ui:
        widget: expression
        port: data
  required: [cluster_count, feature_col]
  additionalProperties: false

配置属性字段

对象中的每个 config.properties 属性都支持以下标准 JSON 架构字段:

领域 类型 Description
type 字符串 数据类型:string、、numberintegerboolean、或arrayobject
title 字符串 UI 中显示的人工可读标签。
description 字符串 向用户显示的帮助文本。
default 任意 字段的默认值。
examples 数组 字段的示例值。
enum 数组 修复了允许值的列表。
format 字符串 语义类型提示。 请参阅 “格式”值
minimum 数字 允许的最小值(对于 numberinteger 类型)。
maximum 数字 允许的最大值(对于 numberinteger 类型)。
items 对象 数组元素的架构(何时typearray为 )。
properties 对象 嵌套属性定义(何时 typeobject)。
required 数组 所需的嵌套属性名称列表(何时 typeobject)。

其他标准 JSON 架构字段(例如minLengthmaxLengthpattern以及const)也受支持。

设置值格式

format配置属性上的字段提供语义类型提示,告知 Lakeflow Designer 如何解释值。 这些提示支持专用 UI 行为和验证。

Format Description
expression 列引用或 SQL 表达式。
table_source 表源引用。
file_source 文件源引用。
column_expressions 列表达式。
sort_expressions 对表达式进行排序。
aggregation_expressions 聚合表达式。
ai_function_expressions AI 函数表达式。
is_preview 自动预览模式标志。 Lakeflow Designer 在工作流预览期间将此设置设置为 true 。 配置属性名称是任意的 , 只有 format: is_preview 标记很重要。 使用此函数可以跳过预览期间外部 API 调用等副作用。
string[] 字符串数组。

UI 小组件

小组件自定义配置字段在 Lakeflow 设计器界面中的呈现方式。 在每个配置属性的属性 x-ui 中定义小组件。 如果省略小组件,Lakeflow Designer 会基于数据类型使用默认小组件。

Widget 数据类型 Description
input 字符串 单行文本输入。
textarea 字符串 多行文本区域。 支持可选 rows 属性。
checkbox 布尔 标准复选框。
toggle 布尔 切换开关。
number number/integer 具有可选约束的数字输入。
slider number/integer 数值范围的可视滑块。 支持可选 step 属性。
select 字符串 单选下拉列表。 需要 optionsSource
multi-select 数组 多选下拉列表。 需要 optionsSource
expression 字符串 列/表达式选择器。 需要 port

input

单行文本输入字段。

api_endpoint:
  type: string
  title: API Endpoint
  x-ui:
    widget: input

textarea

多行文本区域用于较长的内容。 支持用于控制高度的可选 rows 属性。

message_body:
  type: string
  title: Message Body
  x-ui:
    widget: textarea
    rows: 4

checkbox

布尔值的标准复选框。

send_notification:
  type: boolean
  title: Send Notification
  default: false
  x-ui:
    widget: checkbox

toggle

布尔值的切换开关。

enable_logging:
  type: boolean
  title: Enable Logging
  default: true
  x-ui:
    widget: toggle

number

数字输入字段。 使用 minimum 属性本身来 maximum 约束范围。

num_clusters:
  type: number
  title: Number of Clusters
  default: 3
  minimum: 1
  maximum: 100
  x-ui:
    widget: number

slider

用于在范围内选择数值的视觉滑块。 使用 minimum 属性并 maximum 设置范围,并 step 控制 x-ui 增量。

confidence_threshold:
  type: number
  title: Confidence Threshold
  default: 0.8
  minimum: 0
  maximum: 1
  x-ui:
    widget: slider
    step: 0.05

select

单选下拉列表。 需要一个 optionsSource 定义下拉列表值来自何处。 请参阅 选项源

aggregation_type:
  type: string
  title: Aggregation Type
  x-ui:
    widget: select
    optionsSource:
      type: static
      values: ['sum', 'avg', 'min', 'max', 'count']

multi-select

用于选择多个值的多选下拉列表。 与 type: array 属性一起使用 items: { type: string } 。 需要一个 optionsSource。 请参阅 选项源

feature_columns:
  type: array
  title: Feature Columns
  items:
    type: string
  x-ui:
    widget: multi-select
    optionsSource:
      type: inputColumns
      port: input_data

expression

允许用户从输入数据中选择列或编写自定义 SQL 表达式的列/表达式选择器。 在属性上设置format: expression并指定输入portx-ui 这非常有用:

  • 当用户应从输入数据中选择列时。
  • 用户可能想要编写自定义 SQL 表达式时。
  • 对于引用管道中的动态数据的参数。
amount:
  type: string
  title: Amount
  format: expression
  x-ui:
    widget: expression
    port: input_data

选项源

对于 selectmulti-select 组件,必须定义下拉列表选项的使用 optionsSource位置。

静态选项

YAML 中定义的值的固定列表。

optionsSource:
  type: static
  values: ['option1', 'option2', 'option3']
财产 类型 必需 Description
type 字符串 Yes 必须是 static
values 数组 Yes 下拉列表的字符串值的数组。

输入列

使用输入端口中的列名称动态填充下拉列表。

optionsSource:
  type: inputColumns
  port: input_data
财产 类型 必需 Description
type 字符串 Yes 必须是 inputColumns
port 字符串 Yes 要从中获取列名称的输入端口的名称。 必须与其中一个定义的输入端口匹配 name

run_function

使用 run_function 属性,可以直接将Python代码嵌入 python-run-function 运算符的 YAML 配置中。 这样就无需注册单独的 Unity 目录函数。

run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        df = inputs["data"]
        threshold = config["threshold"]
        return {"out": df.filter(df["score"] > threshold)}
财产 类型 必需 Description
type 字符串 Yes 必须是 inline
code 字符串 Yes Python源代码。 必须定义函数 run()

run() 函数接收三个参数:

  • config:UI 中用户设置的配置值的字典。
  • inputs:将输入端口名称映射到 DataFrame 的字典。
  • spark:当前活动的 SparkSession。

该函数必须返回一个字典,将输出端口名称映射到 DataFrames。 键必须与在中name定义的每个输出端口的字段完全匹配ports.output。 例如,输出端口名为 out

return {"out": result_df}

具有多个输出端口:

return {"match": match_df, "rest": rest_df}

environment

environment 属性指定 python-run-function 运算符的Python环境。 使用它固定环境版本并声明 pip 依赖项。

environment:
  environment_version: '4'
  dependencies:
    - 'scikit-learn>=1.3'
    - 'pandas>=2.0'
财产 类型 必需 Description
environment_version 字符串 要使用的环境版本。 例如,"4"
dependencies 字符串数组 pip 依赖项说明符的列表。 每个条目都遵循标准 pip 语法(例如 "pandas>=2.0", )。

完整示例

基于 UC 的 UDF

此示例定义一个基于 Unity 目录的 UDF 运算符,用于计算复合兴趣。

schema: user-defined-operator-v0.1.0
type: uc-udf
name: Compound Interest
id: finance.compound_interest
version: '1.0.0'
description: >
  Calculates compound interest based on principal, rate, and time period.

config:
  type: object
  properties:
    principal:
      type: string
      title: Principal Amount
      format: expression
      x-ui:
        widget: expression
        port: input_data

    annual_rate:
      type: number
      title: Annual Interest Rate
      default: 5.0
      minimum: 0
      maximum: 100
      x-ui:
        widget: number

    years:
      type: number
      title: Number of Years
      default: 10
      minimum: 1
      maximum: 50
      x-ui:
        widget: slider
        step: 1

    compound_frequency:
      type: string
      title: Compounding Frequency
      default: 'monthly'
      x-ui:
        widget: select
        optionsSource:
          type: static
          values: ['daily', 'monthly', 'quarterly', 'annually']
  required: [principal, annual_rate]
  additionalProperties: false

ports:
  input:
    - name: input_data
      title: Input Data
  output:
    - name: out
      title: Output

Python run-function 运算符

此示例定义使用 python-run-function K-Means 聚类分析对客户进行细分的操作员。

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Customer Segmentation
id: ml.customer_segmentation
version: '1.2.0'
description: >
  Segments customers into groups based on selected features
  using K-Means clustering. Returns customer IDs with their
  assigned segment numbers.

config:
  type: object
  properties:
    num_segments:
      type: integer
      title: Number of Segments
      description: How many customer segments to create
      default: 3
      minimum: 2
      maximum: 20
      x-ui:
        widget: number
    customer_id_column:
      type: string
      title: Customer ID Column
      description: Column containing customer identifiers
      x-ui:
        widget: select
        optionsSource:
          type: inputColumns
          port: customer_data
    feature_columns:
      type: array
      title: Feature Columns
      description: Columns to use for segmentation
      items:
        type: string
      x-ui:
        widget: multi-select
        optionsSource:
          type: inputColumns
          port: customer_data
    normalize_features:
      type: boolean
      title: Normalize Features
      description: Whether to normalize feature values before clustering
      default: true
      x-ui:
        widget: toggle
  required: [num_segments, customer_id_column, feature_columns]
  additionalProperties: false

ports:
  input:
    - name: customer_data
      title: Customer Data
      mime: application/vnd.databricks.dataframe
  output:
    - name: segmented_customers
      title: Segmented Customers

run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        from pyspark.ml.feature import VectorAssembler, StandardScaler
        from pyspark.ml.clustering import KMeans

        df = inputs["customer_data"]
        id_col = config["customer_id_column"]
        features = config["feature_columns"]
        k = config["num_segments"]
        normalize = config.get("normalize_features", True)

        assembler = VectorAssembler(inputCols=features, outputCol="features_vec")
        assembled = assembler.transform(df)

        if normalize:
            scaler = StandardScaler(inputCol="features_vec", outputCol="scaled_features")
            model = scaler.fit(assembled)
            assembled = model.transform(assembled)
            feature_col = "scaled_features"
        else:
            feature_col = "features_vec"

        kmeans = KMeans(k=k, featuresCol=feature_col, predictionCol="segment")
        result = kmeans.fit(assembled).transform(assembled)

        return {"segmented_customers": result.select(id_col, "segment")}

environment:
  environment_version: '4'
  dependencies:
    - 'scikit-learn>=1.3'

快速参考

所需的根属性

  • schema: user-defined-operator-v0.1.0
  • name:显示名称
  • id:唯一标识符
  • description:运算符的作用
  • config:JSON 架构对象
  • typeuc-udfuc-udtfpython-run-function
  • version:作者定义的版本字符串

可选根属性

  • ports:输入和输出端口定义
  • run_function:内联Python代码(仅限 python-run-function
  • environment:仅Python环境和依赖项(仅限 python-run-function

配置属性数据类型

string | boolean | number | integer | array | object

UI 小组件

input | textarea | checkbox | toggle | number | slider | select | multi-select | expression

选项源

static (固定值) | inputColumns (来自输入端口)

设置值格式

expression | table_source | file_source | column_expressions | sort_expressions | aggregation_expressions | ai_function_expressions | is_preview | string[]