Handling large queries in interactive workflows

A challenge with interactive data workflows is handling large queries. This includes queries that generate too many output rows, fetch many external partitions, or compute on extremely large data sets. These queries can be extremely slow, saturate compute resources, and make it difficult for others to share the same compute.

Query Watchdog is a process that prevents queries from monopolizing compute resources by examining the most common causes of large queries and terminating queries that pass a threshold. This article describes how to enable and configure Query Watchdog.

Important

Query Watchdog is enabled for all all-purpose computes created using the UI.

Example of a disruptive query

An analyst is performing some ad hoc queries in a just-in-time data warehouse. The analyst uses a shared autoscaling compute that makes it easy for multiple users to use a single compute at the same time. Suppose there are two tables that each have a million rows.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

These table sizes are manageable in Apache Spark. However, they each include a join_key column with an empty string in every row. This can happen if the data is not perfectly clean or if there is significant data skew where some keys are more prevalent than others. These empty join keys are far more prevalent than any other value.

In the following code, the analyst is joining these two tables on their keys, which produces output of one trillion results, and all of these are produced on a single executor (the executor that gets the " " key):

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

This query appears to be running. But without knowing about the data, the analyst sees that there's "only" a single task left over the course of executing the job. The query never finishes, leaving the analyst frustrated and confused about why it did not work.

In this case there is only one problematic join key. Other times there may be many more.

Enable and configure Query Watchdog

To enable and configure Query Watchdog, the following steps are required.

  • Enable Watchdog with spark.databricks.queryWatchdog.enabled.
  • Configure the task runtime with spark.databricks.queryWatchdog.minTimeSecs.
  • Display output with spark.databricks.queryWatchdog.minOutputRows.
  • Configure the output ratio with spark.databricks.queryWatchdog.outputRatioThreshold.

To a prevent a query from creating too many output rows for the number of input rows, you can enable Query Watchdog and configure the maximum number of output rows as a multiple of the number of input rows. In this example we use a ratio of 1000 (the default).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

The latter configuration declares that any given task should never produce more than 1000 times the number of input rows.

Tip

The output ratio is completely customizable. We recommend starting lower and seeing what threshold works well for you and your team. A range of 1,000 to 10,000 is a good starting point.

Not only does Query Watchdog prevent users from monopolizing compute resources for jobs that will never complete, it also saves time by fast-failing a query that would have never completed. For example, the following query will fail after several minutes because it exceeds the ratio.

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

Here's what you would see:

Query watchdog

It's usually enough to enable Query Watchdog and set the output/input threshold ratio, but you also have the option to set two additional properties: spark.databricks.queryWatchdog.minTimeSecs and spark.databricks.queryWatchdog.minOutputRows. These properties specify the minimum time a given task in a query must run before cancelling it and the minimum number of output rows for a task in that query.

For example, you can set minTimeSecs to a higher value if you want to give it a chance to produce a large number of rows per task. Likewise, you can set spark.databricks.queryWatchdog.minOutputRows to ten million if you want to stop a query only after a task in that query has produced ten million rows. Anything less and the query succeeds, even if the output/input ratio was exceeded.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Tip

If you configure Query Watchdog in a notebook, the configuration does not persist across compute restarts. If you want to configure Query Watchdog for all users of a compute, we recommend that you use a compute configuration.

Detect query on extremely large dataset

Another typical large query may scan a large amount of data from big tables/datasets. The scan operation may last for a long time and saturate compute resources (even reading metadata of a big Hive table can take a significant amount of time). You can set maxHivePartitions to prevent fetching too many partitions from a big Hive table. Similarly, you can also set maxQueryTasks to limit queries on an extremely large dataset.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

When should you enable Query Watchdog?

Query Watchdog should be enabled for ad hoc analytics compute where SQL analysts and data scientists are sharing a given compute and an administrator needs to make sure that queries "play nicely" with one another.

When should you disable Query Watchdog?

In general we do not advise eagerly cancelling queries used in an ETL scenario because there typically isn't a human in the loop to correct the error. We recommend that you disable Query Watchdog for all but ad hoc analytics compute.