CREATE STREAMING TABLE
适用于: Databricks SQL
建立流式处理表,它是 Delta 表,额外支持流式处理或增量数据处理。
流式处理表仅在增量实时表和具有 Unity Catalog 的 Databricks SQL 中受支持。 在支持的 Databricks Runtime 计算上运行此命令仅分析语法。 请参阅使用 SQL 开发管道代码。
语法
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
参数
REFRESH
如果指定,则使用查询中定义的源中的最新可用数据刷新表。 只有在查询开始之前到达的新数据才会被处理。 在执行命令期间添加到源的新数据将被忽略,直到下一次刷新为止。 CREATE OR REFRESH 中的刷新操作是完全声明性的。 如果刷新命令未指定原始表创建语句中的所有元数据,则会删除未指定的元数据。
IF NOT EXISTS
如果流式处理表不存在,则创建该表。 如果已存在具有此名称的表,则忽略
CREATE STREAMING TABLE
语句。最多可以指定
IF NOT EXISTS
或OR REFRESH
中的一项。-
要创建的表的名称。 名称不得包含时态规范或选项规范。 如果未限定该名称,则会在当前架构中创建该表。
table_specification
此可选子句定义列的列表、列的类型、属性、说明和列约束。
如果未在表架构中定义列,则必须指定
AS query
。-
列的唯一名称。
-
指定列的数据类型。
NOT NULL
如果已指定,则列不会接受
NULL
值。COMMENT column_comment
用于描述列的字符串字面量。
-
重要
此功能目前以公共预览版提供。
将主键或外键约束添加到流式处理表中的列。
hive_metastore
目录中的表不支持约束。 -
重要
此功能目前以公共预览版提供。
添加列掩码函数以对敏感数据进行匿名化处理。 该列的所有后续查询将会收到对该列计算该函数(而不是该列的原始值)的结果。 这对于细粒度的访问控制目的非常有用,在这种情况下,该函数可以检查调用用户的身份或组成员身份,以便决定是否编辑该值。
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
将数据质量预期添加到表中。 这些数据质量预期可以随着时间的推移进行跟踪,并通过流式处理表的事件日志进行访问。 在创建表和刷新表时,
FAIL UPDATE
期望会导致处理失败。 如果未满足DROP ROW
预期,则预期会导致删除整行。expectation_expr
可能包含文本、表中的列标识符以及内置的确定性 SQL 函数或运算符,但以下内容除外:此外,
expr
不能包含任何expr
。-
重要
此功能目前以公共预览版提供。
向流式处理表添加信息性主键或信息性外键约束。
hive_metastore
目录中的表不支持键约束。
-
-
table_clauses
为新表指定分区、注释、用户定义的属性和刷新计划(可选)。 每个子子句只能指定一次。
-
表中用于对表进行分区的列可选列表。
COMMENT table_comment
用于描述表的
STRING
文本。-
可以选择设置一个或多个用户定义的属性。
使用此设置可指定用于运行此语句的增量实时表运行时通道。 将
pipelines.channel
属性的值设置为"PREVIEW"
或"CURRENT"
。 默认值为"CURRENT"
。 有关增量实时表通道的详细信息,请参阅增量实时表运行时通道。 SCHEDULE [ REFRESH ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
重要
此功能目前以公共预览版提供。
要安排定期进行的刷新,请使用
EVERY
语法。 如果指定了EVERY
语法,则会按照指定的间隔根据提供的值(如HOUR
、HOURS
、DAY
、DAYS
、WEEK
或WEEKS
)定期刷新流式处理表或具体化视图。 下表列出了number
可接受的整数值。Time unit 整数值 HOUR or HOURS
1 < = H < = 72 DAY or DAYS
1 < = D < = 31 WEEK or WEEKS
1 < = W < = 8 注意
包含的时间单位的单数形式和复数形式在语义上是等效的。
CRON cron_string [ AT TIME ZONE timezone_id ]
使用 qartz cron 值安排刷新。 接受有效 time_zone_values。 不支持
AT TIME ZONE LOCAL
。如果
AT TIME ZONE
不存在,则使用会话时区。 如果AT TIME ZONE
不存在并且未设置会话时区,则会引发错误。SCHEDULE
在语义上等效于SCHEDULE REFRESH
。
计划可以作为
CREATE
命令的一部分提供。 使用 ALTER STREAMING TABLE 或运行包含SCHEDULE
子句的CREATE OR REFRESH
命令在创建后更改流式处理表的计划。WITH ROW FILTER 子句
重要
此功能目前以公共预览版提供。
向表中添加行筛选器函数。 该表中的所有后续查询都将收到函数计算结果为布尔值 TRUE 的行的子集。 这对于细粒度的访问控制目的非常有用,在这种情况下,该函数可以检查调用用户的身份或组成员身份,以决定是否筛选特定行。
-
AS 查询
此子句使用
query
中的数据来填充表。 此查询必须是流式处理查询。 这可以通过将STREAM
关键字添加到要增量处理的任何关系中来实现。 同时指定query
和table_specification
时,table_specification
中指定的表架构必须包含query
返回的所有列,否则会出现错误。 在table_specification
中指定但未由query
返回的任何列在查询时都返回null
值。
流式处理表与其他表之间的差异
流式处理表是有状态的表,设计用于在处理不断增长的数据集时只处理每一行一次。 由于大部分数据集会随着时间不断扩大,因此流式处理表非常适合用于大多数引入工作负载。 流式处理表最适合需要全新数据和低延迟的管道。 流式处理表还可用于大规模转换,因为随着新数据的到达可以增量计算结果,使结果保持最新,而无需在每次更新时完全重新计算所有源数据。 流式处理表专为仅追加的数据源而设计。
流式处理表接受其他命令,例如 REFRESH
,啟处理查询中提供的源中可用的最新数据。 对所提供查询的更改仅通过调用 REFRESH
反映在新数据上,而不是以前处理过的数据。 要将更改也应用于现有数据,需要执行 REFRESH TABLE <table_name> FULL
来执行 FULL REFRESH
。 完全刷新使用最新定义重新处理源中的所有可用数据。 不建议对不保留整个数据历史记录或保留期较短的源(如 Kafka)调用完全刷新,因为完全刷新会截断现有数据。 如果数据在源中不再可用,则可能无法恢复旧数据。
行筛选器和列掩码
重要
此功能目前以公共预览版提供。
行筛选器允许指定在表扫描提取行时作为筛选器应用的函数。 这些筛选器可确保后续查询仅返回筛选器谓词的计算结果为 true 的行。
每当表扫描提取行时,列掩码就会让你将列值掩码。 将来所有涉及该列的查询都会收到对该列计算函数后的结果,并替换列的原始值。
有关如何使用行筛选器和列掩码的详细信息,请参阅使用行筛选器和列掩码筛选敏感表数据。
管理行筛选器和列掩码
应通过 CREATE OR REFRESH
语句添加、更新或删除流式处理表上的行筛选器和列掩码。
行为
- 以定义者身份刷新:当
CREATE OR REFRESH
或REFRESH
语句刷新流式处理表时,行筛选器函数将以定义者的权限(以表所有者的身份)运行。 这意味着表刷新使用创建流式处理表的用户的安全上下文。 - 查询:虽然大多数筛选器都以定义者的权限运行,但检查用户上下文(例如
CURRENT_USER
和IS_MEMBER
)的函数并非如此。 这些函数作为调用程序运行。 此方法基于当前用户的上下文强制实施特定于用户的数据安全和访问控制。
可观察性
使用DESCRIBE EXTENDED
、INFORMATION_SCHEMA
或目录资源管理器检查应用于给定流式处理表的现有行筛选器和列掩码。 此功能允许用户审核和查看有关流式处理表的数据访问和保护措施。
限制
只有表所有者才能刷新流式处理表以获取最新数据。
流式处理表上不允许
ALTER TABLE
命令。 应该通过CREATE OR REFRESH
或 ALTER STREAMING TABLE 语句更改该表的定义和属性。不支持按时间顺序查看查询。
不支持通过 DML 命令(如
INSERT INTO
和MERGE
)来发展表模式。流式处理表不支持以下命令:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
不支持 Delta Sharing。
不支持重命名表或更改所有者。
不支持
PRIMARY KEY
和FOREIGN KEY
等表约束。不支持生成的列、标识列和默认列。
示例
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')