CREATE STREAMING TABLE

适用于: 勾选“是” Databricks SQL

建立流式处理表,它是 Delta 表,额外支持流式处理或增量数据处理。

流式处理表仅在增量实时表和具有 Unity Catalog 的 Databricks SQL 中受支持。 在支持的 Databricks Runtime 计算上运行此命令仅分析语法。 请参阅使用 SQL 开发管道代码

语法

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

参数

  • REFRESH

    如果指定,则使用查询中定义的源中的最新可用数据刷新表。 只有在查询开始之前到达的新数据才会被处理。 在执行命令期间添加到源的新数据将被忽略,直到下一次刷新为止。 CREATE OR REFRESH 中的刷新操作是完全声明性的。 如果刷新命令未指定原始表创建语句中的所有元数据,则会删除未指定的元数据。

  • IF NOT EXISTS

    如果流式处理表不存在,则创建该表。 如果已存在具有此名称的表,则忽略 CREATE STREAMING TABLE 语句。

    最多可以指定 IF NOT EXISTSOR REFRESH 中的一项。

  • table_name

    要创建的表的名称。 名称不得包含时态规范或选项规范。 如果未限定该名称,则会在当前架构中创建该表。

  • table_specification

    此可选子句定义列的列表、列的类型、属性、说明和列约束。

    如果未在表架构中定义列,则必须指定 AS query

    • column_identifier

      列的唯一名称。

      • column_type

        指定列的数据类型

      • NOT NULL

        如果已指定,则列不会接受 NULL 值。

      • COMMENT column_comment

        用于描述列的字符串字面量。

      • column_constraint

        重要

        此功能目前以公共预览版提供。

        将主键或外键约束添加到流式处理表中的列。 hive_metastore 目录中的表不支持约束。

      • MASK 子句

        重要

        此功能目前以公共预览版提供。

        添加列掩码函数以对敏感数据进行匿名化处理。 该列的所有后续查询将会收到对该列计算该函数(而不是该列的原始值)的结果。 这对于细粒度的访问控制目的非常有用,在这种情况下,该函数可以检查调用用户的身份或组成员身份,以便决定是否编辑该值。

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        将数据质量预期添加到表中。 这些数据质量预期可以随着时间的推移进行跟踪,并通过流式处理表的事件日志进行访问。 在创建表和刷新表时,FAIL UPDATE 期望会导致处理失败。 如果未满足 DROP ROW 预期,则预期会导致删除整行。

        expectation_expr 可能包含文本、表中的列标识符以及内置的确定性 SQL 函数或运算符,但以下内容除外:

        此外,expr 不能包含任何expr

      • table_constraint

        重要

        此功能目前以公共预览版提供。

        向流式处理表添加信息性主键或信息性外键约束。 hive_metastore 目录中的表不支持键约束。

  • table_clauses

    为新表指定分区、注释、用户定义的属性和刷新计划(可选)。 每个子子句只能指定一次。

    • PARTITIONED BY

      表中用于对表进行分区的列可选列表。

    • COMMENT table_comment

      用于描述表的 STRING 文本。

    • TBLPROPERTIES

      可以选择设置一个或多个用户定义的属性。

      使用此设置可指定用于运行此语句的增量实时表运行时通道。 将 pipelines.channel 属性的值设置为 "PREVIEW""CURRENT"。 默认值为 "CURRENT"。 有关增量实时表通道的详细信息,请参阅增量实时表运行时通道

    • SCHEDULE [ REFRESH ] schedule_clause

      • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

        重要

        此功能目前以公共预览版提供。

        要安排定期进行的刷新,请使用 EVERY 语法。 如果指定了 EVERY 语法,则会按照指定的间隔根据提供的值(如 HOURHOURSDAYDAYSWEEKWEEKS)定期刷新流式处理表或具体化视图。 下表列出了 number 可接受的整数值。

        Time unit 整数值
        HOUR or HOURS 1 < = H < = 72
        DAY or DAYS 1 < = D < = 31
        WEEK or WEEKS 1 < = W < = 8

        注意

        包含的时间单位的单数形式和复数形式在语义上是等效的。

      • CRON cron_string [ AT TIME ZONE timezone_id ]

        使用 qartz cron 值安排刷新。 接受有效 time_zone_values。 不支持 AT TIME ZONE LOCAL

        如果 AT TIME ZONE 不存在,则使用会话时区。 如果 AT TIME ZONE 不存在并且未设置会话时区,则会引发错误。 SCHEDULE 在语义上等效于 SCHEDULE REFRESH

      计划可以作为 CREATE 命令的一部分提供。 使用 ALTER STREAMING TABLE 或运行包含 SCHEDULE 子句的 CREATE OR REFRESH 命令在创建后更改流式处理表的计划。

    • WITH ROW FILTER 子句

      重要

      此功能目前以公共预览版提供。

      向表中添加行筛选器函数。 该表中的所有后续查询都将收到函数计算结果为布尔值 TRUE 的行的子集。 这对于细粒度的访问控制目的非常有用,在这种情况下,该函数可以检查调用用户的身份或组成员身份,以决定是否筛选特定行。

  • AS 查询

    此子句使用 query 中的数据来填充表。 此查询必须是流式处理查询。 这可以通过将 STREAM 关键字添加到要增量处理的任何关系中来实现。 同时指定 querytable_specification 时,table_specification 中指定的表架构必须包含 query 返回的所有列,否则会出现错误。 在 table_specification 中指定但未由 query 返回的任何列在查询时都返回 null 值。

流式处理表与其他表之间的差异

流式处理表是有状态的表,设计用于在处理不断增长的数据集时只处理每一行一次。 由于大部分数据集会随着时间不断扩大,因此流式处理表非常适合用于大多数引入工作负载。 流式处理表最适合需要全新数据和低延迟的管道。 流式处理表还可用于大规模转换,因为随着新数据的到达可以增量计算结果,使结果保持最新,而无需在每次更新时完全重新计算所有源数据。 流式处理表专为仅追加的数据源而设计。

流式处理表接受其他命令,例如 REFRESH,啟处理查询中提供的源中可用的最新数据。 对所提供查询的更改仅通过调用 REFRESH 反映在新数据上,而不是以前处理过的数据。 要将更改也应用于现有数据,需要执行 REFRESH TABLE <table_name> FULL 来执行 FULL REFRESH。 完全刷新使用最新定义重新处理源中的所有可用数据。 不建议对不保留整个数据历史记录或保留期较短的源(如 Kafka)调用完全刷新,因为完全刷新会截断现有数据。 如果数据在源中不再可用,则可能无法恢复旧数据。

行筛选器和列掩码

重要

此功能目前以公共预览版提供。

行筛选器允许指定在表扫描提取行时作为筛选器应用的函数。 这些筛选器可确保后续查询仅返回筛选器谓词的计算结果为 true 的行。

每当表扫描提取行时,列掩码就会让你将列值掩码。 将来所有涉及该列的查询都会收到对该列计算函数后的结果,并替换列的原始值。

有关如何使用行筛选器和列掩码的详细信息,请参阅使用行筛选器和列掩码筛选敏感表数据

管理行筛选器和列掩码

应通过 CREATE OR REFRESH 语句添加、更新或删除流式处理表上的行筛选器和列掩码。

行为

  • 以定义者身份刷新:当 CREATE OR REFRESHREFRESH 语句刷新流式处理表时,行筛选器函数将以定义者的权限(以表所有者的身份)运行。 这意味着表刷新使用创建流式处理表的用户的安全上下文。
  • 查询:虽然大多数筛选器都以定义者的权限运行,但检查用户上下文(例如 CURRENT_USERIS_MEMBER)的函数并非如此。 这些函数作为调用程序运行。 此方法基于当前用户的上下文强制实施特定于用户的数据安全和访问控制。

可观察性

使用DESCRIBE EXTENDEDINFORMATION_SCHEMA或目录资源管理器检查应用于给定流式处理表的现有行筛选器和列掩码。 此功能允许用户审核和查看有关流式处理表的数据访问和保护措施。

限制

  • 只有表所有者才能刷新流式处理表以获取最新数据。

  • 流式处理表上不允许 ALTER TABLE 命令。 应该通过 CREATE OR REFRESHALTER STREAMING TABLE 语句更改该表的定义和属性。

  • 不支持按时间顺序查看查询。

  • 不支持通过 DML 命令(如 INSERT INTOMERGE)来发展表模式。

  • 流式处理表不支持以下命令:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • 不支持 Delta Sharing。

  • 不支持重命名表或更改所有者。

  • 不支持 PRIMARY KEYFOREIGN KEY 等表约束。

  • 不支持生成的列、标识列和默认列。

示例

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')