Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Monitoring your streaming applications' performance, cost, and health is essential to building reliable, efficient ETL pipelines. Azure Databricks provides a rich set of observability features across Jobs, Lakeflow Declarative Pipelines, and Lakeflow Connect to help diagnose bottlenecks, optimize performance, and manage resource usage and costs.
This article outlines best practices in the following areas:
Key streaming performance metrics
Event log schemas and example queries
Streaming query monitoring
Exporting logs and metrics to external tools
When operating streaming pipelines, monitor the following key metrics:
Metric | Purpose |
---|---|
Backpressure | Monitors the number of files and offsets (sizes). Helps identify bottlenecks and ensures the system can handle incoming data without falling behind. |
Throughput | Tracks the number of messages processed per micro-batch. Assess pipeline efficiency and check that it keeps pace with data ingestion. |
Duration | Measures the average duration of a micro-batch. Indicates processing speed and helps tune batch intervals. |
Latency | Indicates how many records/messages are processed over time. Helps understand end-to-end pipeline delays and optimize for lower latencies. |
Cluster utilization | Reflects CPU and memory usage (%). Ensures efficient resource use and helps scale clusters to meet processing demands. |
Network | Measures data transferred and received. Useful for identifying network bottlenecks and improving data transfer performance. |
Checkpoint | Identifies processed data and offsets. Ensures consistency and enables fault tolerance during failures. |
Cost | Shows a streaming application's hourly, daily, and monthly costs. Aids in budgeting and resource optimization. |
Lineage | Displays datasets and layers created in the streaming application. Facilitates data transformation, tracking, quality assurance, and debugging. |
Azure Databricks cluster logs and metrics provide detailed insights into cluster performance and utilization. These logs and metrics include information about CPU, memory, disk I/O, network traffic, and other system metrics. Monitoring these metrics is crucial for optimizing cluster performance, managing resources efficiently, and troubleshooting issues.
Azure Databricks cluster logs and metrics offer detailed insights into cluster performance and resource utilization. These include CPU and memory usage, disk I/O, and network traffic. Monitoring these metrics is critical for:
- Optimizing cluster performance.
- Managing resources efficiently.
- Troubleshooting operational issues.
The metrics can be leveraged through the Databricks UI or exported to personal monitoring tools. See Notebook example: Datadog metrics.
The Spark UI shows detailed information about the progress of jobs and stages, including the number of tasks completed, pending, and failed. This helps you understand the execution flow and identify bottlenecks.
For streaming applications, the Streaming tab shows metrics such as input rate, processing rate, and batch duration. It helps you monitor your streaming jobs' performance and identify any data ingestion or processing issues.
See Debugging with the Apache Spark UI for more information.
The compute metrics will help you understand the cluster utilization. As your job runs, you can see how it scales and how your resources are affected. You'll be able to find memory pressure that could lead to OOM failures or CPU pressure that could cause long delays. Here are the specific metrics you'll see:
- Server Load Distribution: Each node's CPU utilization over the past minute.
- CPU Utilization: The percentage of time the CPU spent in various modes (for example, user, system, idle, and iowait).
- Memory Utilization: Total memory usage by each mode (for example, used, free, buffer, and cached).
- Memory Swap Utilization: Total memory swap usage.
- Free Filesystem Space: Total filesystem usage by each mount point.
- Network Throughput: The number of bytes received and transmitted through the network by each device.
- Number of Active Nodes: The number of active nodes at every timestamp for the given compute.
See Monitor performance and Hardware metric charts for more information.
The Lakeflow Declarative Pipelines event log captures a comprehensive record of all pipeline events, including:
- Audit logs.
- Data quality checks.
- Pipeline progress.
- Data lineage.
The event log is automatically enabled for all Lakeflow Declarative Pipelines and can be accessed via:
- Pipeline UI: View logs directly.
- DLT API: Programmatic access.
- Direct query: Query the event log table.
For more information, see event log schema for Lakeflow Declarative Pipelines.
These example queries help monitor the performance and health of pipelines by providing key metrics such as batch duration, throughput, backpressure, and resource utilization.
This query calculates the average duration of batches processed by the pipeline.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
This query calculates the average throughput of the pipeline in terms of processed rows per second.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
This query measures the pipeline's backpressure by checking the data backlog.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
This query has insights into the utilization of clusters or slots used by the pipeline.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
You can monitor streaming queries in jobs through the Streaming Query Listener.
Attach a listener to the Spark session to enable the Streaming Query Listener inAzure Databricks. This listener will monitor the progress and metrics of your streaming queries. It can be used to push metrics to external monitoring tools or log them for further analysis.
::: note
This is available in Databricks Runtime 11.3 LTS and above for Python and Scala.
:::
You can export streaming metrics to external services for alerting or dashboarding by using the StreamingQueryListener
interface.
Here is a basic example of how to implement a listener:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Below is an example of a StreamingQueryListener event log for a Kafka to Delta Lake streaming query:
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
For more examples, see: Examples.
Query progress metrics are essential for monitoring the performance and health of your streaming queries. These metrics include the number of input rows, processing rates, and various durations related to the query execution. You can observe these metrics by attaching a StreamingQueryListener
to the Spark session. The listener will emit events containing these metrics at the end of each streaming epoch.
For example, you can access metrics using the StreamingQueryProgress.observedMetrics
map in the listener's onQueryProgress
method. This allows you to track and analyze the performance of your streaming queries in real-time.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)