流式处理表是支持流式处理或增量数据处理的表。 流表由 Lakeflow 声明性管道支持。 每次刷新流式处理表时,添加到源表的数据都会追加到流式处理表中。 可以手动或按计划刷新流式处理表。
若要详细了解如何执行或计划刷新,请参阅 在 Lakeflow 声明性管道中运行更新。
语法
CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]
table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]
参数
刷新
如果指定,将创建表或更新现有表及其内容。
专用
创建专用流式处理表。
- 它们不会添加到目录中,并且只能在定义管道中访问
- 它们可以与目录中的现有对象同名。 在管道中,如果专用流式处理表和目录中的对象同名,则对该名称的引用将解析为专用流式处理表。
- 专用流式处理表仅在管道的生存期内保留,而不仅仅是单个更新。
专用流式处理表以前是使用
TEMPORARY
参数创建的。table_name
新创建的表的名称。 完全限定的表名称必须是独一无二的。
table_specification
此可选子句定义列的列表、列的类型、属性、说明和列约束。
-
列名必须具有唯一性,并映射到查询的输出列。
-
指定列的数据类型。 并非 Azure Databricks 支持的所有数据类型都受流式处理表支持。
column_comment
描述列的可选
STRING
文本。 此选项必须与column_type
一起指定。 如果未指定列类型,则会跳过列注释。-
添加一个约束,用于在数据流入表时验证数据。 请参阅通过管道预期管理数据质量。
-
重要
此功能目前以公共预览版提供。
添加列掩码函数以对敏感数据进行匿名化处理。
请参阅 行筛选器和列掩码。
-
table_constraint
重要
此功能目前以公共预览版提供。
指定架构时,可以定义主键和外键。 约束具备信息性,系统不会强制执行。 请参阅 SQL 语言参考中的 CONSTRAINT 子句。
注释
若要定义表约束,管道必须是启用了 Unity Catalog 的管道。
table_clauses
(可选)指定表的分区、注释和用户定义的属性。 每个子句只能指定一次。
使用 DELTA
指定数据格式。 唯一的选项是 DELTA。
此子句是可选的,默认为 DELTA。
PARTITIONED BY
包含一列或多列的可选列表,用于对表进行分区。 与
CLUSTER BY
互斥。Liquid 聚类分析为聚类分析提供了灵活的优化解决方案。 请考虑为 Lakeflow 声明性管道使用
CLUSTER BY
而不是PARTITIONED BY
。按组排序
对表启用动态聚类,并定义要用作聚类键的列。 使用自动聚类功能与
CLUSTER BY AUTO
,Databricks智能地选择聚类键以优化查询性能。 与PARTITIONED BY
互斥。请参阅 对表使用液体聚类分析。
LOCATION
表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。
评论
用于描述表的可选
STRING
文本。TBLPROPERTIES
表的表属性可选列表。
WITH ROW FILTER
重要
此功能目前以公共预览版提供。
向表中添加行筛选器函数。 将来对该表的查询会收到函数计算结果为 TRUE 的行的子集。 这对于精细的访问控制很有用,因为它允许函数检查调用用户的标识和组成员身份以决定是否筛选某些行。
请参阅
ROW FILTER
条款。-
此子句使用
query
中的数据来填充表。 此查询必须是流式处理查询。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和SkipChangeCommits
选项来处理错误。同时指定
query
和table_specification
时,table_specification
中指定的表架构必须包含query
返回的所有列,否则会出现错误。 在table_specification
中指定但未由query
返回的任何列在查询时都返回null
值。有关流数据的详细信息,请参阅 使用管道转换数据。
所需的权限
管道的运行方式用户必须具有以下权限:
- 对流式处理表引用的基表的
SELECT
特权。 - 对父目录的
USE CATALOG
特权和对父架构的USE SCHEMA
特权。 - 对流式处理表架构的
CREATE MATERIALIZED VIEW
特权。
为了使用户能够更新在其中定义流式处理表的管道,他们需要:
- 对父目录的
USE CATALOG
特权和对父架构的USE SCHEMA
特权。 - 流式处理表的所有权或对流式处理表的
REFRESH
特权。 - 流式处理表所有者必须对流式处理表引用的基表具有
SELECT
特权。
要使用户能够查询生成的流式处理表,他们需要:
- 对父目录的
USE CATALOG
特权和对父架构的USE SCHEMA
特权。 -
SELECT
流式处理表的特权。
局限性
- 只有表所有者才能刷新流式处理表以获取最新数据。
- 流式处理表上不允许
ALTER TABLE
命令。 应该通过CREATE OR REFRESH
或 ALTER STREAMING TABLE 语句更改该表的定义和属性。 - 不支持通过 DML 命令(如
INSERT INTO
和MERGE
)来发展表模式。 - 流式数据表不支持以下命令:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
- 不支持重命名表或更改所有者。
- 不支持生成的列、标识列和默认列。
例子
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")
-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;