使用提供的编写器设置要处理的流式处理查询的输出。 可将处理逻辑指定为将行作为输入的函数,也可以指定为具有可选process(row)方法和open(partition_id, epoch_id)close(error)方法的对象。
Syntax
foreach(f)
参数
| 参数 | 类型 | Description |
|---|---|---|
f |
可调用或对象 | 采用 Row 作为输入的函数,或具有 process(row) 方法和可选 open 方法和 close 方法的对象。 |
退货
DataStreamWriter
注释
所提供的对象必须可序列化。 写入数据的任何初始化(例如,打开连接)都应在内部 open()完成,而不是在构造时完成。
示例
import time
df = spark.readStream.format("rate").load()
使用函数处理每行:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
使用对象openprocess和close方法处理每一行:
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()