教程:K-means 聚类分析

Important

此功能目前以公共预览版提供。

本教程演示如何为 Lakeflow Designer 生成Python用户定义的表函数 (UDTF) 运算符,该运算符使用 scikit-learn 运行 K-means 群集。 UDTF 非常适合需要处理整个数据集的机器学习任务。 有关用户定义的运算符的背景信息,请参阅 Lakeflow Designer 中的用户定义的运算符

概述

本教程逐步讲解如何使用 Python创建 UDTF 用户定义的运算符。 操作员对所选列执行 K-Means 聚类分析,允许用户:

  • 选择要用作特征的列。
  • 指定群集数。
  • 返回一个表,其中包含每一行的群集分配。

步骤 1:理解 UDTF 处理程序模式

UDTF 作为具有两个关键方法的 Python 类实现:

  • eval(row, ...) — 对每个输入行调用,用于累积数据
  • terminate() — 在处理所有行后调用以生成结果

这种模式使 UDTF 能够:

  1. eval() 调用期间收集所有数据点
  2. terminate() 中训练 K-Means 模型
  3. 逐行返回聚类结果
class SklearnKMeans:
    def __init__(self):
        self.id_col = None
        self.feature_cols = None
        self.k = None
        self.rows = []
        self.features = []

    def eval(self, row, id_column, columns, k):
        """Called one time per input row - accumulate data here."""
        # Initialize configuration on first row
        if self.id_col is None:
            self.id_col = id_column
        if self.feature_cols is None:
            self.feature_cols = columns
        if self.k is None:
            self.k = max(1, int(k))

        # Convert row to dictionary and store
        row_dict = row.asDict(recursive=False)
        self.rows.append(row_dict)

        # Extract numeric features
        feats = []
        for c in self.feature_cols:
            v = row_dict.get(c)
            if v is None:
                v = 0.0
            feats.append(float(v))
        self.features.append(feats)

    def terminate(self):
        """Called after all rows - train model and yield results."""
        import numpy as np
        from sklearn.cluster import KMeans

        if not self.rows:
            return

        X = np.asarray(self.features, dtype=float)
        n_samples = X.shape[0]
        n_clusters = min(self.k, n_samples)

        model = KMeans(
            n_clusters=n_clusters,
            n_init=10,
            random_state=42
        )
        labels = model.fit_predict(X)

        # Yield results row by row
        for row_dict, label in zip(self.rows, labels):
            yield str(row_dict[self.id_col]), int(label)

注释

row 中的 eval() 参数是一个 PySpark Row 对象。 使用 .asDict() 将其转换为字典,以便于访问。

步骤 2:为操作员创建 YAML

YAML 配置定义运算符在 Lakeflow Designer 中的显示方式。 对于此运算符:

  • Number 参数k): 要创建的群集数
  • 选择部件 (id_column): 包含输入表中各列的下拉列表
  • 多选控件 (columns): 选择多个功能列
  • optionsSource:根据输入表结构自动填充下拉列表
  • 输入端口:指定此运算符接受表格数据
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: '1.0.0'
description: Perform K-Means clustering on selected columns
config:
  type: object
  properties:
    k:
      type: number
      title: Number of Clusters
      default: 3
      minimum: 1
      maximum: 100
      x-ui:
        widget: number
    id_column:
      type: string
      title: ID Column
      x-ui:
        widget: select
        optionsSource:
          type: inputColumns
          port: input_data
    columns:
      type: array
      items:
        type: string
      title: Feature Columns
      x-ui:
        widget: multi-select
        optionsSource:
          type: inputColumns
          port: input_data
  required:
    - k
    - id_column
    - columns
  additionalProperties: false
ports:
  input:
    - name: input_data
      title: Input Data
  output:
    - name: output
      title: Clustered Data

有关所有可用属性、数据类型、控件和选项的完整指南,请参阅用户定义运算符 YAML 参考

步骤 3:创建 Unity 目录函数

将 YAML 配置和Python处理程序类合并为单个 CREATE FUNCTION 语句。

CREATE OR REPLACE FUNCTION main.my_schema.k_means(
    input_data TABLE,
    id_column STRING,
    columns ARRAY<STRING>,
    k INT
)
RETURNS TABLE (
    id STRING,
    cluster_id INT
)
LANGUAGE PYTHON
HANDLER 'SklearnKMeans'
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: "1.0.0"
description: Perform K-Means clustering on selected columns
config:
  type: object
  properties:
    k:
      type: number
      title: Number of Clusters
      default: 3
      minimum: 1
      maximum: 100
      x-ui:
        widget: number
    id_column:
      type: string
      title: ID Column
      x-ui:
        widget: select
        optionsSource:
          type: inputColumns
          port: input_data
    columns:
      type: array
      items:
        type: string
      title: Feature Columns
      x-ui:
        widget: multi-select
        optionsSource:
          type: inputColumns
          port: input_data
  required:
    - k
    - id_column
    - columns
  additionalProperties: false
ports:
  input:
    - name: input_data
      title: Input Data
  output:
    - name: output
      title: Clustered Data
"""

class SklearnKMeans:
    def __init__(self):
        self.id_col = None
        self.feature_cols = None
        self.k = None
        self.rows = []
        self.features = []

    def eval(self, row, id_column, columns, k):
        if self.id_col is None:
            self.id_col = id_column
        if self.feature_cols is None:
            self.feature_cols = columns
        if self.k is None:
            self.k = max(1, int(k))

        row_dict = row.asDict(recursive=False)
        self.rows.append(row_dict)

        feats = []
        for c in self.feature_cols:
            v = row_dict.get(c)
            if v is None:
                v = 0.0
            feats.append(float(v))
        self.features.append(feats)

    def terminate(self):
        import numpy as np
        from sklearn.cluster import KMeans

        if not self.rows:
            return

        X = np.asarray(self.features, dtype=float)
        n_samples = X.shape[0]
        n_clusters = min(self.k, n_samples)

        model = KMeans(
            n_clusters=n_clusters,
            n_init=10,
            random_state=42
        )
        labels = model.fit_predict(X)

        for row_dict, label in zip(self.rows, labels):
            yield str(row_dict[self.id_col]), int(label)
$$

步骤 4:使用示例数据进行测试

创建用于测试的示例客户数据:

-- Create sample customer data
CREATE OR REPLACE TEMP VIEW customers AS
SELECT * FROM VALUES
    ('C001', 25, 35000, 20),
    ('C002', 45, 85000, 80),
    ('C003', 35, 55000, 50),
    ('C004', 50, 95000, 90),
    ('C005', 23, 30000, 15),
    ('C006', 40, 75000, 70),
    ('C007', 60, 100000, 95),
    ('C008', 30, 45000, 40)
AS t(customer_id, age, annual_income, spending_score);

测试 K-Means UDTF:

-- Run K-Means clustering with 3 clusters
SELECT * FROM main.my_schema.k_means(
    input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
    k => 3,
    id_column => 'customer_id',
    columns => array('age', 'annual_income', 'spending_score')
)

在这种情况下,你想要将聚类分析结果联接回原始数据,以查看群集分配:

-- Join cluster results with original data
SELECT
  c.*,
  k.cluster_id
FROM customers c
INNER JOIN main.my_schema.k_means(
    input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
    k => 3,
    id_column => 'customer_id',
    columns => array('age', 'annual_income', 'spending_score')
) k
ON c.customer_id = k.id
ORDER BY k.cluster_id, c.customer_id

步骤 5:注册操作员

若要在 Lakeflow Designer 中使用该运算符,您必须先注册它,方法是将其添加到您的 .user_defined_operators.yaml 文件中:

operators:
  - catalog: main
    schema: my_schema
    functionName: k_means

注释

如果在用户文件夹中定义此文件,则只会为你显示该文件。 有关更多信息,请参阅 使 Operator 可被发现

步骤 6:设置权限

向需要使用此操作员的用户授予访问权限:

GRANT USE SCHEMA ON SCHEMA main.my_schema TO `<user>`;
GRANT EXECUTE ON FUNCTION main.my_schema.k_means TO `<user>`;

在 Lakeflow Designer 中使用运算符

注册后,该操作器会显示在 Lakeflow Designer 中,并具有以下内容:

  • 用于连接数据源的输入端口
  • 用于选择唯一标识各行的列的下拉列表
  • 用于选择哪些列作为聚类特征的多选控件
  • 用于输入所需群集数量的数字输入框

用户无需编写代码即可将客户、产品或任何其他数据细分为有意义的组。

构建 UDTF 的提示

  1. 初始化状态 __init__ — 设置空列表/变量以累积数据
  2. eval中累积 — 先不要处理,只收集数据
  3. terminate中的流程——这里才是真正处理工作的地方
  4. 使用 yield 返回行 — 从 terminate 逐个返回结果
  5. 处理边缘情况 — 如果行数少于群集,该怎么办?
  6. 使类型保持显式 — UDTF 返回无法引用输入类型