append_flow

@dp.append_flow修饰器为 Lakeflow 声明性管道表创建追加流或回填。 该函数必须返回 Apache Spark 流式处理数据帧。 请参阅 使用 Lakeflow 声明性管道流以增量方式加载和处理数据

追加流可以面向流式处理表或接收器。

Syntax

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <boolean>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

参数

参数 类型 Description
函数 function 必填。 从用户定义的查询返回 Apache Spark 流式处理数据帧的函数。
target str 必填。 作为追加流的目标的表或接收器的名称。
name str 流名称。 如果未提供,则默认为函数名称。
once bool (可选)将流定义为一次性流,例如回填。 通过两种方式使用 once=True 更改流:
  • 返回值。 streaming-query。 在这种情况下,必须是批处理数据帧,而不是流式处理数据帧。
  • 默认情况运行一次。 如果管道通过完全刷新进行更新,则 ONCE 流会再次运行以重新创建数据。
comment str 流程的描述。
spark_conf dict 用于执行此查询的 Spark 配置列表

例子

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))