其中包含用于在 Azure Databricks 上使用结构化流式处理的常见模式的笔记本和代码示例。
如果你不熟悉结构化流式处理,请参阅运行第一个结构化流式处理工作负载。
Apache Cassandra 是一个分布式、低延迟、可缩放且高度可用的 OLTP 数据库。
结构化流式处理通过 Spark Cassandra 连接器与 Cassandra 配合使用。 此连接器同时支持 RDD 和数据帧 API,并且具有对写入流式处理数据的原生支持。 重要提示 必须使用相应版本的 spark-cassandra-connector-assembly。
以下示例连接到 Cassandra 数据库群集中的一个或多个主机。 它还指定连接配置,例如检查点位置以及特定的密钥空间和表名称:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
streamingDF.writeStream.foreachBatch()
允许你重复使用现有批数据写入器将流式处理查询的输出写入 Azure Synapse Analytics。 有关详细信息,请参阅 foreachBatch 文档。
若要运行此示例,需要 Azure Synapse Analytics 连接器。 有关 Azure Synapse Analytics 连接器的详细信息,请参阅 Azure Synapse Analytics 中的查询数据。
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.chinacloudapi.cn/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
这两个笔记本显示了如何在 Python 和 Scala 中使用流间联接。