Structured Streaming patterns on Azure Databricks
This contains notebooks and code samples for common patterns for working with Structured Streaming on Azure Databricks.
Getting started with Structured Streaming
If you are brand new to Structured Streaming, see Run your first Structured Streaming workload.
Write to Cassandra as a sink for Structured Streaming in Python
Apache Cassandra is a distributed, low-latency, scalable, highly-available OLTP database.
Structured Streaming works with Cassandra through the Spark Cassandra Connector. This connector supports both RDD and DataFrame APIs, and it has native support for writing streaming data. Important You must use the corresponding version of the spark-cassandra-connector-assembly.
The following example connects to one or more hosts in a Cassandra database cluster. It also specifies connection configurations such as the checkpoint location and the specific keyspace and table names:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Write to Azure Synapse Analytics using foreachBatch()
in Python
streamingDF.writeStream.foreachBatch()
allows you to reuse existing batch data writers to write the
output of a streaming query to Azure Synapse Analytics. See the foreachBatch documentation for details.
To run this example, you need the Azure Synapse Analytics connector. For details on the Azure Synapse Analytics connector, see Query data in Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.chinacloudapi.cn/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Stream-Stream joins
These two notebooks show how to use stream-stream joins in Python and Scala.