Azure Databricks 上的结构化流式处理模式

其中包含用于在 Azure Databricks 上使用结构化流式处理的常见模式的笔记本和代码示例。

结构化流式处理入门

如果你不熟悉结构化流式处理,请参阅运行第一个结构化流式处理工作负载

作为 Python 中的结构化流式处理接收器写入 Cassandra

Apache Cassandra 是一个分布式、低延迟、可缩放且高度可用的 OLTP 数据库。

结构化流式处理通过 Spark Cassandra 连接器与 Cassandra 配合使用。 此连接器同时支持 RDD 和数据帧 API,并且具有对写入流式处理数据的原生支持。 重要提示 必须使用相应版本的 spark-cassandra-connector-assembly

以下示例连接到 Cassandra 数据库群集中的一个或多个主机。 它还指定连接配置,例如检查点位置以及特定的密钥空间和表名称:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

使用 Python 中的 foreachBatch() 写入到 Azure Synapse Analytics

streamingDF.writeStream.foreachBatch() 允许你重复使用现有批数据写入器将流式处理查询的输出写入 Azure Synapse Analytics。 有关详细信息,请参阅 foreachBatch 文档

若要运行此示例,需要 Azure Synapse Analytics 连接器。 有关 Azure Synapse Analytics 连接器的详细信息,请参阅 Azure Synapse Analytics 中的查询数据

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.chinacloudapi.cn/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

流之间的联接

这两个笔记本显示了如何在 Python 和 Scala 中使用流间联接。

对 Python 笔记本进行流间联接

获取笔记本

对 Scala 笔记本进行流间联接

获取笔记本