将 Unity Catalog 与增量实时表管道配合使用

重要

Unity Catalog 的增量实时表支持目前以公共预览版提供。

除了通过现有的支持将表持久保存到 Hive 元存储之外,你还可以将 Unity Catalog 与增量实时表管道配合使用,以便:

  • 在 Unity Catalog 中定义一个目录,管道将在其中持久保存表。
  • 从 Unity Catalog 表读取数据。

工作区可以包含使用 Unity Catalog 或 Hive 元存储的管道。 但是,单个管道无法同时写入 Hive 元存储和 Unity Catalog,并且现有管道无法升级为使用 Unity Catalog。 不使用 Unity Catalog 的现有管道不受此预览版的影响,而会继续使用配置的存储位置将数据持久保存到 Hive 元存储。

除非本文档中另有说明,否则使用 Unity Catalog 的管道支持所有现有的数据源和增量实时表功能。 使用 Unity Catalog 的管道支持 PythonSQL 接口。

还可以使用 Databricks Runtime 13.3 LTS 及更高版本或 SQL 仓库从共享的 Unity Catalog 群集中查询在管道中创建的表。 无法从已分配或未隔离的群集查询表。

若要管理对 Unity Catalog 管道所创建的表的权限,请使用 GRANT 和 REVOKE

要求

通过增量实时表管道在 Unity Catalog 中创建表需要满足以下要求:

  • 你必须对目标目录拥有 USE CATALOG 特权。
  • 如果管道创建具体化视图,你必须在目标架构中拥有 CREATE MATERIALIZED VIEWUSE SCHEMA 特权。
  • 如果管道创建流式表,你必须在目标架构中拥有 CREATE TABLEUSE SCHEMA 特权。
  • 如果管道设置中未指定目标架构,你必须对目标目录中的至少一个架构拥有 CREATE MATERIALIZED VIEWCREATE TABLE 特权。

限制

下面是将 Unity Catalog 与增量实时表配合使用时的限制:

  • 默认情况下,只有管道所有者和工作区管理员有权查看运行启用了 Unity 目录的管道的群集中的驱动程序日志。 若要使其他用户能够查看驱动程序日志,请参阅允许非管理员用户从启用了 Unity 目录的管道查看驱动程序日志

  • 使用 Hive 元存储的现有管道无法升级为使用 Unity Catalog。 若要迁移写入到 Hive 元存储的现有管道,必须创建新管道,并从数据源重新引入数据。

  • 不能在附加到元存储(在 Unity Catalog 公共预览期间创建)的工作区中创建已启用 Unity Catalog 的管道。 请参阅升级到特权继承

  • 不支持 JAR。 仅 Python 库支持安装第三方库。 请参阅管理增量实时表管道的 Python 依赖项

  • 不支持修改流式表架构的数据操作语言(DML)查询。

  • 在增量实时表管道中创建的具体化视图不能用作该管道外部(例如,在另一个管道或下游笔记本中)的流式处理源。

  • 仅在预览频道中支持发布到可指定托管存储位置的架构。

  • 如果某个管道发布到具有托管存储位置的架构,则可以在后续更新中更改架构,但唯一的前提是更新的架构使用与之前指定的架构相同的存储位置。

  • 如果目标架构指定存储位置,则所有表都存储在该位置。 如果未指定架构存储位置,但目标目录指定了一个目录存储位置,则表存储在该目录存储位置中。 如果未指定架构和目录存储位置,表将存储在发布表的元存储的根存储位置。

  • 目录资源管理器中的“历史记录”选项卡不显示流式表和具体化视图的历史记录。

  • 定义表时不支持 LOCATION 属性。

  • 已启用 Unity Catalog 的管道无法发布到 Hive 元存储。

  • Python UDF 支持为公共预览版。 若要使用 Python UDF,管道必须使用预览通道

  • 不能将 Delta Sharing 用于发布到 Unity Catalog 的增量实时表具体化视图或流式表。

  • 不能在管道或查询中使用 event_log 表值函数来访问多个管道的事件日志。

  • 不能与其他用户共享基于 event_log 表值函数创建的视图。

  • 已启用 Unity Catalog 的管道不支持单节点群集。 由于增量实时表可能会创建单节点群集以运行较小的管道,因此管道可能会失败并显示引用single-node mode的错误消息。 如果发生这种情况,请确保在配置计算设置时至少指定一个辅助角色。

  • 无法从已分配的群集或非隔离群集查询在已启用 Unity Catalog 的管道中创建的表。 若要查询由增量实时表管道创建的表,必须使用采用 Databricks Runtime 13.3 LTS 及更高版本或 SQL 仓库的共享访问模式群集。

  • 增量实时表使用共享访问模式群集来运行已启用 Unity Catalog 的管道。 无法在已分配的群集上运行已启用 Unity Catalog 的管道。 若要了解 Unity Catalog 共享访问模式的限制,请参阅 Unity Catalog 上的共享访问模式限制

  • 不能对发布到 Unity Catalog 的具体化视图或流式表使用行筛选器或列过滤

注意

支持具体化视图的基础文件可能包含来自上游表的数据(包括可能的个人身份信息),这些数据未出现在具体化视图定义中。 此数据会自动添加到基础存储,从而支持具体化视图的增量刷新。

由于具体化视图的基础文件可能会暴露来自不属于具体化视图架构的上游表的数据,因此 Databricks 建议不要与不受信任的下游使用者共享基础存储。

例如,假设具体化视图的定义包含COUNT(DISTINCT field_a)子句。 即使具体化视图定义仅包含聚合COUNT DISTINCT子句,基础文件也会包含field_a的实际值列表。

对现有功能的更改

当增量实时表配置为将数据保存到 Unity Catalog 时,表的生命周期由增量实时表管道管理。 因为管道管理表生命周期和权限:

  • 从增量实时表管道定义中删除某个表时,相应的具体化视图或流式表条目会在下一次管道更新时从 Unity Catalog 中删除。 实际数据会保留一段时间,以便在误删时可以恢复。 可以通过将具体化视图或流式表添加回管道定义来恢复数据。
  • 删除增量实时表管道会导致删除该管道中定义的所有表。 由于此项更改,增量实时表UI 已更新,现在会提示确认删除管道。
  • 用户无法直接访问内部支持表(包括用于支持 APPLY CHANGES INTO 的支持表)。

从增量实时表管道将表写入 Unity Catalog

注意

如果未为管道选择目录和目标架构,则表不会发布到 Unity Catalog,并且只能供同一管道中的查询访问。

若要将表写入 Unity Catalog,创建管道时,请在“存储选项”下选择“Unity Catalog”,在“目录”下拉菜单中选择一个目录,然后在“目标架构”下拉菜单中选择一个现有架构或输入新架构的名称。 若要了解 Unity Catalog 目录,请参阅 Azure Databricks 中的目录是什么?。 若要了解 Unity Catalog 中的架构,请参阅 Azure Databricks 中的架构是什么?

将数据引入 Unity Catalog 管道

配置为使用 Unity Catalog 的管道可从以下位置读取数据:

  • Unity Catalog 托管和外部表、视图、具体化视图和流式表。
  • Hive 元存储表和视图。
  • 自动加载程序使用 cloud_files() 函数从 Unity Catalog 外部位置读取数据。
  • Apache Kafka 和 Amazon Kinesis。

下面是从 Unity Catalog 和 Hive 元存储表读取数据的示例。

从 Unity Catalog 表批量引入

SQL

CREATE OR REFRESH MATERIALIZED VIEW
  table_name
AS SELECT
  *
FROM
  my_catalog.my_schema.table1;

Python

@dlt.table
def table_name():
  return spark.table("my_catalog.my_schema.table")

流式处理 Unity Catalog 表中的更改

SQL

CREATE OR REFRESH STREAMING TABLE
  table_name
AS SELECT
  *
FROM
  STREAM(my_catalog.my_schema.table1);

Python

@dlt.table
def table_name():
  return spark.readStream.table("my_catalog.my_schema.table")

从 Hive 元存储引入数据

使用 Unity Catalog 的管道可以使用 hive_metastore 目录从 Hive 元存储表读取数据:

SQL

CREATE OR REFRESH MATERIALIZED VIEW
  table_name
AS SELECT
  *
FROM
  hive_metastore.some_schema.table;

Python

@dlt.table
def table3():
  return spark.table("hive_metastore.some_schema.table")

从自动加载程序引入数据

SQL

CREATE OR REFRESH STREAMING TABLE
  table_name
AS SELECT
  *
FROM
  cloud_files(
    <path-to-uc-external-location>,
    "json"
  )

Python

@dlt.table(table_properties={"quality": "bronze"})
def table_name():
  return (
     spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .load(f"{path_to_uc_external_location}")
 )

共享具体化视图

默认情况下,管道创建的表只能由管道所有者查询。 可以使用 GRANT 语句授予其他用户查询表的能力,并可以使用 REVOKE 语句撤销查询访问权限。 有关 Unity Catalog 中的特权的详细信息,请参阅管理 Unity Catalog 中的特权

授予对表的 SELECT 权限

GRANT SELECT ON TABLE
  my_catalog.my_schema.table_name
TO
  `user@databricks.com`

撤销对表的 SELECT 权限

REVOKE SELECT ON TABLE
  my_catalog.my_schema.table_name
FROM
  `user@databricks.com`

授予创建表或创建具体化视图的权限

GRANT CREATE { MATERIALIZED VIEW | TABLE } ON SCHEMA
  my_catalog.my_schema
TO
  { principal | user }

查看管道的世系

增量实时表管道中表的世系在目录资源管理器中可见。 对于已启用 Unity Catalog 的管道中的具体化视图或流式表,目录资源管理器世系 UI 会显示上游表和下游表。 若要详细了解 Unity Catalog 世系,请参阅捕获和查看 Unity Catalog 的数据世系

对于已启用 Unity Catalog 的增量实时表管道中的具体化视图或流式表,如果可以从当前工作区访问管道,则目录资源管理器世系 UI 还会链接到生成了具体化视图或流式表的管道。

在流式表中添加、更改或删除数据

可使用数据操作语言 (DML) 语句(包括插入、更新、删除和合并语句)来修改发布到 Unity Catalog 的流式表。 支持对流式表进行 DML 查询,这可实现一些用例,例如更新表来使其符合一般数据保护条例 (GDPR)。

注意

  • 不支持 DML 声明修改流式表的表架构。 确保 DML 语句不会尝试修改表架构。
  • 只能在使用 Databricks Runtime 13.3 LTS 及更高版本的 Unity Catalog 共享群集或 SQL 仓库中运行用于更新流式表的 DML 语句。
  • 由于流式传输要求仅追加数据源,因此如果你的处理需要从包含更改的源流式传输表进行流式传输(例如,通过 DML 语句),请在读取源流式传输表时设置 skipChangeCommits 标志。 设置 skipChangeCommits 后,将会忽略删除或修改源表上记录的事务。 如果你的处理不需要流式表,则可使用具体化视图(没有仅追加限制)作为目标表。

下面是用于修改流式表中的记录的 DML 语句示例。

删除具有特定 ID 的记录:

DELETE FROM my_streaming_table WHERE id = 123;

更新具有特定 ID 的记录:

UPDATE my_streaming_table SET name = 'Jane Doe' WHERE id = 123;