将 Unity Catalog 与结构化流式处理结合使用

本页介绍如何在 Azure Databricks 上使用 Structured Streaming 与 Unity Catalog 来管理增量和流式工作负载的数据治理。

Unity Catalog 支持哪些结构化流式处理功能?

Unity Catalog 不会为 Azure Databricks 上可用的结构化流源和输出端添加任何显式限制。

使用 Unity Catalog 和 Structured Streaming,您可以:

  • 从托管表及外部表流式传输数据。 请参阅 Azure Databricks 中 Delta Lake 和 Apache Iceberg 的 Unity Catalog 托管表
  • 使用 Unity 目录管理的外部位置通过对象存储 URI 与数据进行交互。
  • 使用表名或文件路径写入外部表。 若要与托管表交互,必须使用表名称。

对于结构化流检查点,必须使用由 Unity Catalog 管理的外部位置中的路径。 若要详细了解如何使用 Unity 目录安全地连接存储,请参阅 使用 Unity 目录连接到云对象存储

将 Unity Catalog 视图读取为流

在 Databricks Runtime 14.3 LTS 及更高版本中,您可以使用结构化流式处理读取在 Unity Catalog 中注册的视图。 基础表必须使用 Delta Lake 格式。 有关其他限制,请参阅 “限制”。

若要使用结构化流读取视图,请使用视图的标识符调用 .table() 方法:

df = (spark.readStream
  .table("demoView")
)

用户必须对目标视图拥有 SELECT 权限。

如果修改视图定义以添加或更改视图中引用的表,则不能使用相同的流式处理检查点。

支持的流送选项

流式读取器对指定视图的底层 Delta 表的文件和元数据应用选项。

支持以下选项:

  • maxFilesPerTrigger
  • maxBytesPerTrigger
  • ignoreDeletes
  • skipChangeCommits
  • withEventTimeOrder
  • startingTimestamp
  • startingVersion

对带有 UNION ALL 的视图执行读取操作时,不支持 withEventTimeOrderstartingVersion 选项。

如果提供不受支持的选项,例如 readChangeFeed,Spark 将引发此异常:

AnalysisException: [UNSUPPORTED_STREAMING_OPTIONS_FOR_VIEW.UNSUPPORTED_OPTION] Unsupported for streaming a view. Reason: option <option> is not supported.

支持的流式处理操作

支持的操作包括:

Operation Description Operator Example
项目 控制列级权限 SELECT... FROM... CREATE VIEW project_view AS SELECT id, value FROM source_table
筛选器 控制行级权限 WHERE... CREATE VIEW filter_view AS SELECT * FROM source_table WHERE value > 100
全部联合 来自多个表的结果 UNION ALL CREATE VIEW union_view AS SELECT id, value FROM source_table1 UNION ALL SELECT * FROM source_table2

不支持的操作包括聚合、排序和表值函数,例如 table_changes()。 有关表值函数的详细信息,请参阅 表值函数 (TVF) 调用

如果你从包含不受支持操作的视图进行流式读取,Spark 会引发此异常:

UnsupportedOperationException: [UNEXPECTED_OPERATOR_IN_STREAMING_VIEW] Unexpected operator <operator> in the CREATE VIEW statement as a streaming source. A streaming view query must consist only of SELECT, WHERE, and UNION ALL operations.

限制