CREATE STREAMING TABLE(DLT)

流式处理表是支持流式处理或增量数据处理的表。 管道笔记本中定义的流式处理表由 DLT 管道提供支持。 每次刷新流式处理表时,添加到源表的数据都会追加到流式处理表中。 可以手动或按计划刷新流式处理表。

要详细了解如何执行或计划刷新,请参阅在 DLT 管道上运行更新

语法

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ column_constraint ] [, ...]
    [ , table_constraint ] [...] )

   column_properties
      { NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
  { USING DELTA
    PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    LOCATION path |
    COMMENT view_comment |
    TBLPROPERTIES clause |
    WITH { ROW FILTER clause } } [ ... ]

参数

  • 刷新

    如果指定,将创建表或更新现有表及其内容。

  • 专用

    创建专用流式处理表。

    • 它们不会添加到目录中,并且只能在定义管道中访问
    • 它们可以与目录中的现有对象同名。 在管道中,如果专用流式处理表和目录中的对象同名,则对该名称的引用将解析为专用流式处理表。
    • 专用流式处理表仅在管道的生存期内保留,而不仅仅是单个更新。

    专用流式处理表以前是使用 TEMPORARY 参数创建的。

  • table_name

    新创建的表的名称。 完全限定的表名称必须是独一无二的。

  • table_specification

    此可选子句定义列的列表、列的类型、属性、说明和列约束。

  • table_constraint

    重要

    此功能目前以公共预览版提供。

    指定架构时,可以定义主键和外键。 约束具备信息性,系统不会强制执行。 请参阅 SQL 语言参考中的 CONSTRAINT 子句

    注意

    若要定义表约束,管道必须是启用了 Unity Catalog 的管道。

  • table_clauses

    (可选)指定表的分区、注释和用户定义的属性。 每个子句只能指定一次。

    • 使用 DELTA

      指定数据格式。 唯一的选项是 DELTA。

      此子句是可选的,默认为 DELTA。

    • PARTITIONED BY

      包含一列或多列的可选列表,用于对表进行分区。 与 CLUSTER BY 互斥。

      Liquid 聚类分析为聚类分析提供了灵活的优化解决方案。 请考虑对 DLT 使用 CLUSTER BY 而不是 PARTITIONED BY

    • 按组排序

      对表启用动态聚类,并定义要用作聚类键的列。 与 PARTITIONED BY 互斥。

      请参阅 对 Delta 表使用液体聚类分析

    • LOCATION

      表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。

    • 评论

      用于描述表的可选 STRING 文本。

    • TBLPROPERTIES

      表的表属性可选列表。

    • WITH ROW FILTER

    重要

    此功能目前以公共预览版提供。

    向表中添加行筛选器函数。 将来对该表的查询会收到函数计算结果为 TRUE 的行的子集。 这对于精细的访问控制很有用,因为它允许函数检查调用用户的标识和组成员身份以决定是否筛选某些行。

    请参阅 ROW FILTER 条款

  • 查询

    此子句使用 query 中的数据来填充表。 此查询必须是流式处理查询。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和 SkipChangeCommits 选项来处理错误。

    同时指定 querytable_specification 时,table_specification 中指定的表架构必须包含 query 返回的所有列,否则会出现错误。 在 table_specification 中指定但未由 query 返回的任何列在查询时都返回 null 值。

    有关流数据的详细信息,请参阅 使用管道转换数据

所需的权限

管道的运行方式用户必须具有以下权限:

  • 对流式处理表引用的基表的 SELECT 特权。
  • 对父目录的 USE CATALOG 特权和对父架构的 USE SCHEMA 特权。
  • 对流式处理表架构的 CREATE MATERIALIZED VIEW 特权。

为了使用户能够更新在其中定义流式处理表的管道,他们需要:

  • 对父目录的 USE CATALOG 特权和对父架构的 USE SCHEMA 特权。
  • 流式处理表的所有权或对流式处理表的 REFRESH 特权。
  • 流式处理表所有者必须对流式处理表引用的基表具有 SELECT 特权。

要使用户能够查询生成的流式处理表,他们需要:

  • 对父目录的 USE CATALOG 特权和对父架构的 USE SCHEMA 特权。
  • SELECT 流式处理表的特权。

局限性

  • 只有表所有者才能刷新流式处理表以获取最新数据。
  • 流式处理表上不允许 ALTER TABLE 命令。 应该通过 CREATE OR REFRESHALTER STREAMING TABLE 语句更改该表的定义和属性。
  • 不支持通过 DML 命令(如 INSERT INTOMERGE)来发展表模式。
  • 流式数据表不支持以下命令:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • 不支持重命名表或更改所有者。
  • 不支持生成的列、标识列和默认列。

例子

-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;