Production considerations for Structured Streaming

This article contains recommendations for scheduling Structured Streaming workloads using jobs on Azure Databricks.

Databricks recommends always doing the following:

  • Remove unnecessary code from notebooks that would return results, such as display and count.
  • Do not run Structured Streaming workloads using all-purpose compute. Always schedule streams as jobs using jobs compute.
  • Schedule jobs using Continuous mode.
  • Do not enable auto-scaling for compute for Structured Streaming jobs.

Some workloads benefit from the following:

Azure Databricks has introduced Delta Live Tables to reduce the complexities of managing production infrastructure for Structured Streaming workloads. Databricks recommends using Delta Live Tables for new Structured Streaming pipelines. See What is Delta Live Tables?.

Note

Compute auto-scaling has limitations scaling down cluster size for Structured Streaming workloads. Databricks recommends using Delta Live Tables with enhanced autoscaling for streaming workloads. See Optimize the cluster utilization of Delta Live Tables pipelines with enhanced autoscaling.

Design streaming workloads to expect failure

Databricks recommends always configuring streaming jobs to automatically restart on failure. Some functionality, including schema evolution, assumes that Structured Streaming workloads are configured to retry automatically. See Configure Structured Streaming jobs to restart streaming queries on failure.

Some operations like foreachBatch provide at-least-once rather than exactly-once guarantees. For these operations, you should make that your processing pipeline is idempotent. See Use foreachBatch to write to arbitrary data sinks.

Note

When a query restarts, the micro-batch planned during the previous run processes. If your job failed due to an out-of-memory error or you manually canceled a job due to an oversized micro-batch, you might need to scale up the compute to successfully process the micro-batch.

If you change configurations between runs, these configurations apply to the first new batch planned.

When does a job retry?

You can schedule multiple tasks as part of an Azure Databricks job. When you configure a job using the continuous trigger, you cannot set dependencies between tasks.

You might choose to schedule multiple streams in a single job using one of the following approaches:

  • Multiple tasks: Define a job with multiple tasks that run streaming workloads using the continuous trigger.
  • Multiple queries: Define multiple streaming queries in the source code for a single task.

You can also combine these strategies. The following table compares these approaches.

Multiple tasks Multiple queries
How is compute shared? Databricks recommends deploying jobs compute appropriately sized to each streaming task. You can optionally share compute across tasks. All queries share the same compute. You can optional assign queries to scheduler pools.
How are retries handled? All tasks must fail before the job retries. The task retries if any query fails.

Configure Structured Streaming jobs to restart streaming queries on failure

Databricks recommends configuring all streaming workloads using the continuous trigger. See Run jobs continuously.

The continuous trigger provides the following behavior by default:

  • Prevents more than one concurrent run of the job.
  • Starts a new run when a previous run fails.
  • Uses exponential backoff for retries.

Databricks recommends always using jobs compute instead of all-purpose compute when scheduling workflows. On job failure and retry, new compute resources deploy.

Note

You do not need to use streamingQuery.awaitTermination() or spark.streams.awaitAnyTermination(). Jobs automatically prevent a run from completing when a streaming query is active.

Use scheduler pools for multiple streaming queries

You can configure schedule pools to assign compute capacity to queries when running multiple streaming queries from the same source code.

By default, all queries started in a notebook run in the same fair scheduling pool. Apache Spark jobs generated by triggers from all of the streaming queries in a notebook run one after another in "first in, first out" (FIFO) order. This can cause unnecessary delays in the queries, because they are not efficiently sharing the cluster resources.

Scheduler pools allow you to declare which Structured Streaming queries share compute resources.

The following example assigns query1 to a dedicated pool, while query2 and query3 share a scheduler pool.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Note

The local property configuration must be in the same notebook cell where you start your streaming query.

See Apache fair scheduler documentation for more details.