增量实时表 Python 语言参考
本文提供了有关 Delta Live Tables Python 编程接口的详细信息。
有关 SQL API 的信息,请参阅增量实时表 SQL 语言参考。
有关配置自动加载程序的详细信息,请参阅什么是自动加载程序?。
限制
Delta Live Tables Python 接口具有以下限制:
- Python
table
和view
函数必须返回数据帧。 某些对数据帧进行操作的函数不返回数据帧,因此不应使用。 由于数据帧转换是在解析完整数据流图后执行的,因此使用此类操作可能会产生意想不到的副作用。 这些操作包括collect()
、count()
、toPandas()
、save()
、saveAsTable()
等函数。 但是,你可以在table
或view
函数定义之外包括这些函数,因为此代码在图形初始化阶段运行一次。 - 不支持
pivot()
函数。 Spark 中的pivot
操作需要预先加载输入数据以计算输出架构。 Delta Live Tables 不支持此功能。
导入 dlt
Python 模块
增量实时表 Python 函数在 dlt
模块中定义。 利用 Python API 实现的管道必须导入此模块:
import dlt
创建 Delta Live Tables 具体化视图或流式处理表
在 Python 中,Delta Live Tables 根据定义查询来确定是将数据集更新为具体化视图还是流式处理表。 @table
修饰器用于定义具体化视图和流式处理表。
若要在 Python 中定义具体化视图,请将 @table
应用于对数据源执行静态读取的查询。 若要定义流式处理表,请将 @table
应用于对数据源执行流式读取的查询。 这两种数据集类型具有相同的语法规范,如下所示:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
创建 Delta Live Tables 视图
要在 Python 中定义视图,请应用 @view
装饰器。 与 @table
修饰器一样,可以将 Delta Live Tables 中的视图用于静态或流式处理数据集。 下面是使用 Python 来定义视图的语法:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
示例:定义表和视图
若要在 Python 中定义表或视图,请将 @dlt.view
或 @dlt.table
修饰器应用于函数。 你可以使用函数名称或 name
参数来分配表或视图名称。 以下示例定义了两个不同的数据集:一个将 JSON 文件作为输入源的 taxi_raw
视图,一个将 taxi_raw
视图作为输入的 filtered_data
表:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
示例:访问在同一管道中定义的数据集
除了从外部数据源读取数据外,还可以使用 Delta Live Tables read()
函数访问同一管道中定义的数据集。 以下示例演示如何使用 read()
函数创建 customers_filtered
数据集:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
还可以使用 spark.table()
函数访问同一管道中定义的数据集。 使用 spark.table()
函数访问管道中定义的数据集时,在函数参数中的数据集名称前加上 LIVE
关键字:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
示例:从元存储中注册的表读取
如果要从 Hive 元存储中注册的表读取数据,请在函数参数中忽略 LIVE
关键字,并选择性地使用数据库名称来限定表名称:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
若要通过示例来了解如何从 Unity Catalog 表读取数据,请参阅将数据引入 Unity Catalog 管道。
示例:使用 spark.sql
访问数据集
你还可以在查询函数中使用 spark.sql
表达式返回数据集。 若要从内部数据集读取数据,请在数据集名称前追加 LIVE.
:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Shanghai'")
从多个源流写入流式处理表
重要
现以公共预览版的形式提供对 @append_flow
的增量实时表支持。
可使用 @append_flow
修饰器从多个流式处理源写入流式处理表来完成以下操作:
- 添加和移除将数据追加到现有流式处理表的流式处理源,无需完全刷新。 例如,你可能有一个表,其中组合了你操作的每个区域中的区域数据。 新区域推出后,你无需执行完全刷新即可将新区域数据添加到该表中。
- 通过追加缺失的历史数据(回填)来更新流式处理表。 例如,你现在有一个由 Apache Kafka 主题写入的流式处理表。 此外,表中还存储了历史数据,这些数据只需要插入一次到流式处理表中,并且你无法流式传输数据,因为在插入数据之前需要执行复杂的聚合。
若要为 @append_flow
处理所输出的记录创建目标表,请使用 create_streaming_table() 函数。
注意
如果需要使用期望定义数据质量约束,请将目标表的期望定义为 create_streaming_table()
函数的一部分。 无法在 @append_flow
定义中定义期望。
下面是 @append_flow
的语法:
import dlt
dlt.create_streaming_table("<target-table-name>")
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment") # optional
def <function-name>():
return (<streaming query>)
示例:从多个 Kafka 主题写入流式处理表
以下示例会创建一个名为 kafka_target
的流式处理表,并从两个 Kafka 主题写入该流式处理表:
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
示例:运行一次性数据回填
以下示例会运行一个查询来将历史数据追加到流式处理表中:
注意
为了确保当回填查询是按计划运行或连续运行的管道的一部分时确实进行一次性回填,请在运行一次管道后移除该查询。 若要在数据到达回填目录中时追加新数据,请保留查询。
import dlt
@dlt.table()
def csv_target():
return spark.readStream.format("csv").load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream.format("csv").load("path/to/backfill/data/dir")
创建一个表,用作流式处理操作的目标
使用 create_streaming_table()
函数为流式处理操作输出的记录(包括 apply_changes() 和 @append_flow 输出记录)创建目标表。
注意
create_target_table()
和 create_streaming_live_table()
函数已弃用。 Databricks 建议更新现有代码以使用 create_streaming_table()
函数。
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
参数 |
---|
name 类型: str 表名称。 此参数是必需的。 |
comment 类型: str 表的可选说明。 |
spark_conf 类型: dict 用于执行此查询的 Spark 配置的可选列表。 |
table_properties 类型: dict 表的表属性可选列表。 |
partition_cols 类型: array 包含一列或多列的可选列表,用于对表进行分区。 |
path 类型: str 表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。 |
schema 类型: str 或 StructType 表的可选架构定义。 架构可以定义为 SQL DDL 字符串,或使用 Python 定义 StructType 。 |
expect_all expect_all_or_drop expect_all_or_fail 类型: dict 表的可选数据质量约束。 请参阅多个期望。 |
控制表的具体化方式
表还提供对其具体化的额外控制:
- 指定如何使用
partition_cols
对表进行分区。 可以使用分区来加快查询速度。 - 可以在定义视图或表时设置表属性。 请参阅 Delta Live Tables 表属性。
- 使用
path
设置为表数据设置存储位置。 默认情况下,如果未设置path
,表数据会存储在管道存储位置。 - 可在架构定义中使用生成的列。 请参阅示例:指定架构和分区列。
注意
对于小于 1 TB 的表,Databricks 建议让增量实时表控制数据组织方式。 除非你预期表会增长到超过 1 TB,否则一般情况下不应指定分区列。
示例:指定架构和分区列
可以选择性地使用 Python StructType
或 SQL DDL 字符串指定表架构。 如果使用 DDL 字符串指定了表架构,则定义可以包括生成的列。
以下示例使用一个通过 Python StructType
指定的架构创建名为 sales
的表:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
以下示例使用 DDL 字符串指定表的架构,定义生成的列,并定义分区列:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
默认情况下,如果未指定架构,则增量实时表将从 table
定义推断架构。
将流式处理表配置为忽略源流式处理表中的更改
注意
skipChangeCommits
标志仅适用于使用option()
函数的spark.readStream
。 不能在dlt.read_stream()
函数中使用此标志。- 当源流式处理表定义为 apply_changes() 函数的目标时,无法使用
skipChangeCommits
标志。
默认情况下,流式处理表需要“仅追加”源。 如果一个流式处理表使用另一个流式处理表作为源,而源流式处理表需要执行更新或删除操作(例如 GDPR 的“被遗忘权”处理),可以在读取源流式处理表时设置 skipChangeCommits
标志来忽略那些更改。 有关此标志的详细信息,请参阅忽略更新和删除。
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Python Delta Live Tables 属性
下表描述了在使用 Delta Live Tables 定义表和视图时可以指定的选项和属性:
@table 或 @view |
---|
name 类型: str 表或视图的可选名称。 如果未定义,将使用函数名称作为表名或视图名称。 |
comment 类型: str 表的可选说明。 |
spark_conf 类型: dict 用于执行此查询的 Spark 配置的可选列表。 |
table_properties 类型: dict 表的表属性可选列表。 |
path 类型: str 表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。 |
partition_cols 类型: a collection of str 包含一列或多列的可选集合(例如 list ),用于对表进行分区。 |
schema 类型: str 或 StructType 表的可选架构定义。 架构可以定义为 SQL DDL 字符串,或使用 Python 定义 StructType 。 |
temporary 类型: bool 创建表,但不发布表的元数据。 temporary 关键字指示 Delta Live Tables 创建可用于管道但不应在管道外部访问的表。 为了缩短处理时间,临时表会在创建它的管道的生存期内持久保留,而不仅仅是一次更新。默认值为“False”。 |
表或视图定义 |
---|
def <function-name>() 用于定义数据集的 Python 函数。 如果未设置 name 参数,则使用 <function-name> 作为目标数据集名称。 |
query 一个 Spark SQL 语句,它返回 Spark Dataset 或 Koalas DataFrame。 使用 dlt.read() 或 spark.table() 从同一管道中定义的数据集执行完整读取操作。 使用 spark.table() 函数从同一管道中定义的数据集读取数据时,在函数参数中的数据集名称前加上 LIVE 关键字。 例如,从名为 customers 的数据集读取数据:spark.table("LIVE.customers") 还可以使用 spark.table() 函数从元存储中注册的表中读取数据,方法是省略 LIVE 关键字,并选择性地使用数据库名称限定表名称:spark.table("sales.customers") 使用 dlt.read_stream() 从同一管道中定义的数据集执行流式读取操作。使用 spark.sql 函数定义 SQL 查询,以创建返回数据集。使用 PySpark 语法通过 Python 定义 Delta Live Tables 查询。 |
预期 |
---|
@expect("description", "constraint") 声明由以下参数确定的数据质量约束: description 。 如果某行违反了预期,则在目标数据集中包含该行。 |
@expect_or_drop("description", "constraint") 声明由以下参数确定的数据质量约束: description 。 如果某行违反了预期,则从目标数据集中删除该行。 |
@expect_or_fail("description", "constraint") 声明由以下参数确定的数据质量约束: description 。 如果某行违反了预期,则立即停止执行。 |
@expect_all(expectations) 声明一个或多个数据质量约束。 expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了其中一个预期,则在目标数据集中包含该行。 |
@expect_all_or_drop(expectations) 声明一个或多个数据质量约束。 expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了任何预期,则从目标数据集中删除该行。 |
@expect_all_or_fail(expectations) 声明一个或多个数据质量约束。 expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了任何预期,则立即停止执行。 |
在 Delta Live Tables 中使用 Python 进行的变更数据捕获
通过 Python API 中的 apply_changes()
函数使用增量实时表 CDC 功能。 Delta Live Tables Python 接口还提供 create_streaming_table() 函数。 可以使用此函数创建 apply_changes()
函数所需的目标表。
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
注意
INSERT
和 UPDATE
事件的默认行为是从源更新插入 CDC 事件:更新目标表中与指定键匹配的任何行或在目标表中不存在匹配记录时插入新行。 可以使用 APPLY AS DELETE WHEN
条件指定对 DELETE
事件的处理。
重要
必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 在指定 apply_changes
目标表的架构时,还必须包含具有与 sequence_by
字段相同数据类型的 __START_AT
和 __END_AT
列。
请参阅在增量实时表中使用 APPLY CHANGES API 简化变更数据捕获。
参数 |
---|
target 类型: str 要更新的表的名称。 可以在执行 apply_changes() 函数之前使用 create_streaming_table() 函数创建目标表。此参数是必需的。 |
source 类型: str 包含 CDC 记录的数据源。 此参数是必需的。 |
keys 类型: list 唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。 可以指定以下任一项: * 字符串列表: ["userId", "orderId"] * Spark SQL col() 函数列表:[col("userId"), col("orderId"] col() 函数的参数不能包含限定符。 例如,可以使用 col(userId) ,但不能使用 col(source.userId) 。此参数是必需的。 |
sequence_by 类型: str 或 col() 指定源数据中 CDC 事件的逻辑顺序的列名。 增量实时表使用此排序来处理乱序到达的更改事件。 可以指定以下任一项: * 字符串: "sequenceNum" * Spark SQL col() 函数:col("sequenceNum") col() 函数的参数不能包含限定符。 例如,可以使用 col(userId) ,但不能使用 col(source.userId) 。此参数是必需的。 |
ignore_null_updates 类型: bool 允许引入包含目标列子集的更新。 当 CDC 事件与现有行匹配且 ignore_null_updates 为 True 时,具有 null 的列将在目标中保留其现有值。 这也适用于值为 null 的嵌套列。 当 ignore_null_updates 为 False 时,现有值将被 null 值覆盖。此参数是可选的。 默认值为 False 。 |
apply_as_deletes 类型: str 或 expr() 指定何时应将 CDC 事件视为 DELETE 而不是更新插入。 为了处理乱序数据,被删除的行被暂时保留为基础 Delta 表中的无效标记,并在元存储中创建一个视图来筛选掉这些无效标记。 保留间隔可以配置为pipelines.cdc.tombstoneGCThresholdInSeconds 表属性。可以指定以下任一项: * 字符串: "Operation = 'DELETE'" * Spark SQL expr() 函数:expr("Operation = 'DELETE'") 此参数是可选的。 |
apply_as_truncates 类型: str 或 expr() 指定何时应将 CDC 事件视为完整表 TRUNCATE 。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。仅 SCD 类型 1 支持 apply_as_truncates 参数。 SCD 类型 2 不支持截断。可以指定以下任一项: * 字符串: "Operation = 'TRUNCATE'" * Spark SQL expr() 函数:expr("Operation = 'TRUNCATE'") 此参数是可选的。 |
column_list except_column_list 类型: list 要包含在目标表中的列的子集。 使用 column_list 指定要包含的列的完整列表。 使用 except_column_list 指定要排除的列。 可以将任一值声明为字符串列表或 Spark SQL col() 函数:* column_list = ["userId", "name", "city"] 。* column_list = [col("userId"), col("name"), col("city")] * except_column_list = ["operation", "sequenceNum"] * except_column_list = [col("operation"), col("sequenceNum") col() 函数的参数不能包含限定符。 例如,可以使用 col(userId) ,但不能使用 col(source.userId) 。此参数是可选的。 当没有 column_list 或 except_column_list 参数传递给函数时,默认设置是包含目标表中的所有列。 |
stored_as_scd_type 类型: str 或 int 将记录存储为 SCD 类型 1 还是 SCD 类型 2。 对于 SCD 类型 1,将其设置为 1 ;对于 SCD 类型 2,将其设置为 2 。此子句是可选的。 默认值为 SCD 类型 1。 |
track_history_column_list track_history_except_column_list 类型: list 要在目标表中跟踪其历史记录的输出列子集。 使用 track_history_column_list 指定要跟踪的列的完整列表。 使用使用 track_history_except_column_list 指定要从跟踪中排除的列。 可将任一值声明为字符串列表或 Spark SQL col() 函数:- track_history_column_list = ["userId", "name", "city"] 。 - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") col() 函数的参数不能包含限定符。 例如,可以使用 col(userId) ,但不能使用 col(source.userId) 。此参数是可选的。 默认设置是未将 track_history_column_list 或track_history_except_column_list 参数传递给函数时包含目标表中的所有列。 |