create_sink

重要

Lakeflow 声明性管道create_sink API 处于公开预览阶段。

函数 create_sink() 将数据从声明性管道写入事件流处理服务(如 Apache Kafka 或 Azure 事件中心)或 Delta 表。 使用 create_sink() 函数创建接收器后,在 追加流 中使用接收器将数据写入接收器。 使用create_sink()函数时,只有追加流这一种流类型受到支持。 不支持其他流类型,例如 create_auto_cdc_flow

Delta 数据汇集器支持 Unity Catalog 外部表和托管表以及 Hive 元数据存储托管表。 表名必须完全限定。 例如,Unity 目录表必须使用三层标识符: <catalog>.<schema>.<table> Hive 元存储表必须使用 <schema>.<table>

注释

  • 运行 完全刷新更新 不会清除汇聚点中的数据。 任何重新处理的数据都将追加到接收器,并且不会更改现有数据。
  • API 不支持 sink Lakeflow 声明性管道预期。

Syntax

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

参数

参数 类型 Description
name str 必填。 标识接收器并用于引用和管理接收器的字符串。 汇集点名称必须是管道内唯一的,包括所有属于管道的源代码文件。
format str 必填。 定义输出格式的字符串,或 kafkadelta
options dict 接收器选项的列表,格式为 {"key": "value"},其中键和值都是字符串。 支持 Kafka 和 Delta 接收器支持的所有 Databricks 运行时选项。

例子

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)