CREATE STREAMING TABLE

适用于:勾选“是” Databricks SQL 勾选“是” Databricks Runtime 13.1 及更高版本

重要

此功能目前以公共预览版提供。 要注册以进行访问,请填写此表格

建立流式处理表,它是 Delta 表,额外支持流式处理或增量数据处理。

流式处理表仅在增量实时表和具有 Unity Catalog 的 Databricks SQL 中受支持。 在支持的 Databricks Runtime 计算上运行此命令仅分析语法。 请参阅使用 SQL 实现 Delta Live Tables 管道

语法

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( [ column_identifier column_type [ NOT NULL ]
      [ COMMENT column_comment ] [ column_constraint ]
    ] [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]

参数

  • REFRESH

    如果指定,则使用查询中定义的源中的最新可用数据刷新表。 只有在查询开始之前到达的新数据才会被处理。 在执行命令期间添加到源的新数据将被忽略,直到下一次刷新为止。

  • IF NOT EXISTS

    如果指定了该参数,并且已存在名称相同的表,则会忽略该语句。

    IF NOT EXISTS 无法与 REFRESH 一起使用,这表示不允许 CREATE OR REFRESH TABLE IF NOT EXISTS

  • table_name

    要创建的表的名称。 名称不得包含时态规范。 如果未限定该名称,则会在当前架构中创建该表。

  • table_specification

    此可选子句定义列的列表、列的类型、属性、说明和列约束。

    如果未在表架构中定义列,则必须指定 AS query

    • column_identifier

      列的唯一名称。

      • column_type

        指定列的数据类型

      • NOT NULL

        如果已指定,则列不会接受 NULL 值。

      • COMMENT column_comment

        用于描述列的字符串字面量。

      • column_constraint

        重要

        此功能目前以公共预览版提供。

        将主键或外键约束添加到流式处理表中的列。 hive_metastore 目录中的表不支持约束。

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        将数据质量预期添加到表中。 这些数据质量预期可以随着时间的推移进行跟踪,并通过流式处理表的事件日志进行访问。 在创建表和刷新表时,FAIL UPDATE 期望会导致处理失败。 如果未满足 DROP ROW 预期,则预期会导致删除整行。

        expectation_expr 可能包含文本、表中的列标识符以及内置的确定性 SQL 函数或运算符,但以下内容除外:

        此外,expr 不能包含任何expr

      • table_constraint

        重要

        此功能目前以公共预览版提供。

        向流式处理表添加信息性主键或信息性外键约束。 hive_metastore 目录中的表不支持键约束。

  • table_clauses

    为新表指定分区、注释、用户定义的属性和刷新计划(可选)。 每个子子句只能指定一次。

    • PARTITIONED BY

      表中用于对表进行分区的列可选列表。

    • COMMENT table_comment

      用于描述表的 STRING 文本。

    • TBLPROPERTIES

      可以选择设置一个或多个用户定义的属性。

    • SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]

      如果提供计划,则它会流式处理表或具体化视图,以使用给定的 quartz cron 计划刷新其数据。 仅接受 time_zone_values。 不支持 AT TIME ZONE LOCAL。 如果 AT TIME ZONE 不存在,则使用会话时区。 如果 AT TIME ZONE 不存在并且未设置会话时区,则会引发错误。 SCHEDULE 在语义上等效于 SCHEDULE REFRESH

      不能在增量实时表管道定义中使用 SCHEDULE 语法。

      CREATE OR REFRESH 命令中不允许使用子句 SCHEDULE。 计划可以作为 CREATE 命令的一部分提供。 使用 ALTER STREAMING TABLE 可以在创建后更改流式处理表的计划。

  • AS 查询

    此子句使用 query 中的数据来填充表。 此查询必须是流式处理查询。 这可以通过将 STREAM 关键字添加到要增量处理的任何关系中来实现。 同时指定 querytable_specification 时,table_specification 中指定的表架构必须包含 query 返回的所有列,否则会出现错误。 在 table_specification 中指定但未由 query 返回的任何列在查询时都返回 null 值。

    此子句对于在 Databricks SQL 中创建的流式处理表是必需的,但在增量实时表中不是必需的。 如果增量实时表中未提供此子句,则必须在 DLT 管道中的 APPLY CHANGES 命令中引用此表。 请参阅,在增量实时表中使用 SQL 进行变更数据捕获

流式处理表与其他表之间的差异

流式处理表是有状态的表,设计用于在处理不断增长的数据集时只处理每一行一次。 由于大部分数据集会随着时间不断扩大,因此流式处理表非常适合用于大多数引入工作负载。 流式处理表最适合需要全新数据和低延迟的管道。 流式处理表还可用于大规模转换,因为随着新数据的到达可以增量计算结果,使结果保持最新,而无需在每次更新时完全重新计算所有源数据。 流式处理表专为仅追加的数据源而设计。

流式处理表接受其他命令,例如 REFRESH,啟处理查询中提供的源中可用的最新数据。 对所提供查询的更改仅通过调用 REFRESH 反映在新数据上,而不是以前处理过的数据。 要将更改也应用于现有数据,需要执行 REFRESH TABLE <table_name> FULL 来执行 FULL REFRESH。 完全刷新使用最新定义重新处理源中的所有可用数据。 不建议对不保留整个数据历史记录或保留期较短的源(如 Kafka)调用完全刷新,因为完全刷新会截断现有数据。 如果数据在源中不再可用,则可能无法恢复旧数据。

限制

  • 只有表所有者才能刷新流式处理表以获取最新数据。

  • 流式处理表上不允许 ALTER TABLE 命令。 表的定义和属性应该通过 ALTER STREAMING TABLE 语句进行更改。

  • 不支持按时间顺序查看查询。

  • 不支持通过 DML 命令(如 INSERT INTOMERGE)来发展表模式。

  • 流式处理表不支持以下命令:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • 不支持 Delta Sharing。

  • 不支持重命名表或更改所有者。

  • 不支持 PRIMARY KEYFOREIGN KEY 等表约束。

  • 不支持生成的列、标识列和默认列。

示例

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');