重要
Lakeflow 声明性管道create_sink
API 处于公开预览阶段。
函数 create_sink()
将数据从声明性管道写入事件流处理服务(如 Apache Kafka 或 Azure 事件中心)或 Delta 表。 使用 create_sink()
函数创建接收器后,在 追加流 中使用接收器将数据写入接收器。 使用create_sink()
函数时,只有追加流这一种流类型受到支持。 不支持其他流类型,例如 create_auto_cdc_flow
。
Delta 数据汇集器支持 Unity Catalog 外部表和托管表以及 Hive 元数据存储托管表。 表名必须完全限定。 例如,Unity 目录表必须使用三层标识符: <catalog>.<schema>.<table>
Hive 元存储表必须使用 <schema>.<table>
。
注释
- 运行 完全刷新更新 不会清除汇聚点中的数据。 任何重新处理的数据都将追加到接收器,并且不会更改现有数据。
- API 不支持
sink
Lakeflow 声明性管道预期。
Syntax
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
参数
参数 | 类型 | Description |
---|---|---|
name |
str |
必填。 标识接收器并用于引用和管理接收器的字符串。 汇集点名称必须是管道内唯一的,包括所有属于管道的源代码文件。 |
format |
str |
必填。 定义输出格式的字符串,或 kafka delta 。 |
options |
dict |
接收器选项的列表,格式为 {"key": "value"} ,其中键和值都是字符串。 支持 Kafka 和 Delta 接收器支持的所有 Databricks 运行时选项。
|
例子
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)