在交互式工作流中处理大型查询 Handling large queries in interactive workflows

交互式数据工作流的一个挑战是处理大型查询。A challenge with interactive data workflows is handling large queries. 这包括生成过多输出行、提取许多外部分区或针对极大的数据集进行计算的查询。This includes queries that generate too many output rows, fetch many external partitions, or compute on extremely large data sets. 这些查询的速度可能非常慢,导致群集资源饱和,并使他人难以共享同一群集。These queries can be extremely slow, saturate cluster resources, and make it difficult for others to share the same cluster.

查询监视器是一个通过检查大型查询的最常见原因并终止超过阈值的查询来防止查询垄断群集资源的过程。Query Watchdog is a process that prevents queries from monopolizing cluster resources by examining the most common causes of large queries and terminating queries that pass a threshold. 本文介绍如何启用并配置查询监视器。This article describes how to enable and configure Query Watchdog.

重要

已为使用 UI 创建的所有用途的群集启用了查询监视器。Query Watchdog is enabled for all all-purpose clusters created using the UI.

中断查询示例Example of a disruptive query

分析师正在实时数据仓库中执行一些即席查询。An analyst is performing some ad hoc queries in a just-in-time data warehouse. 分析师使用共享自动缩放群集,使得多个用户可以轻松地同时使用单个群集。The analyst uses a shared autoscaling cluster that makes it easy for multiple users to use a single cluster at the same time. 假设有两个表,每个表有一百万行。Suppose there are two tables that each have a million rows.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

这些表的大小在 Apache Spark 中是可管理的。These table sizes are manageable in Apache Spark. 但是,它们都包含一个 join_key 列,每行都有一个空字符串。However, they each include a join_key column with an empty string in every row. 如果数据没有完全清除,或者某些键比其他键更普遍存在明显的数据偏斜,则可能发生这种情况。This can happen if the data is not perfectly clean or if there is significant data skew where some keys are more prevalent than others. 这些空的联接键比其他任何值更普遍。These empty join keys are far more prevalent than any other value.

在下面的代码中,分析师将这两个表联接到它们的键上,这将产生一万亿个结果,并且所有这些都是在单个执行器(获得 " " 键的执行器)上生成的:In the following code, the analyst is joining these two tables on their keys, which produces output of one trillion results , and all of these are produced on a single executor (the executor that gets the " " key):

SELECT
  id, count()
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

该查询似乎正在运行。This query appears to be running. 但是,在不知道数据的情况下,分析师发现在执行作业的过程中仅剩下一个任务。But without knowing about the data, the analyst sees that there’s “only” a single task left over the course of executing the job. 查询永不结束,这让分析师感到沮丧和困惑,不知道它为什么不起作用。The query never finishes, leaving the analyst frustrated and confused about why it did not work.

在这种情况下,只有一个出错联接键。In this case there is only one problematic join key. 其他时候可能还有更多。Other times there may be many more.

启用和配置查询监视器Enable and configure Query Watchdog

若要防止查询为输入行数创建过多的输出行,可以启用查询监视器并将最大输出行数配置为输入行数的倍数。To a prevent a query from creating too many output rows for the number of input rows, you can enable Query Watchdog and configure the maximum number of output rows as a multiple of the number of input rows. 此示例使用 1000 的比率(默认值)。In this example we use a ratio of 1000 (the default).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

后一种配置声明,任何给定的任务产生的行数永远不会超过输入行数的 1000 倍。The latter configuration declares that any given task should never produce more than 1000 times the number of input rows.

提示

输出比率完全可自定义。The output ratio is completely customizable. 我们建议你从较低的起点开始,看看什么阈值对你和你的团队都适用。We recommend starting lower and seeing what threshold works well for you and your team. 1,000 到 10,000 的范围是一个很好的起点。A range of 1,000 to 10,000 is a good starting point.

查询监视器不仅可以防止用户为永远不会完成的作业垄断群集资源,还可以通过快速失败一个永远无法完成的查询来节省时间。Not only does Query Watchdog prevent users from monopolizing cluster resources for jobs that will never complete, it also saves time by fast-failing a query that would have never completed. 例如,以下查询将在几分钟后失败,因为它超出了比率。For example, the following query will fail after several minutes because it exceeds the ratio.

SELECT
  join_key,
  sum(x.id),
  count()
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY join_key

你将看到以下内容:Here’s what you would see:

查询监视器Query watchdog

通常,启用查询监视器并设置输出/输入阈值比率就足够了,但是你还可以选择设置两个附加属性:spark.databricks.queryWatchdog.minTimeSecsspark.databricks.queryWatchdog.minOutputRowsIt’s usually enough to enable Query Watchdog and set the output/input threshold ratio, but you also have the option to set two additional properties: spark.databricks.queryWatchdog.minTimeSecs and spark.databricks.queryWatchdog.minOutputRows. 这些属性指定取消查询之前给定任务必须运行的最短时间,以及该查询中任务的最小输出行数。These properties specify the minimum time a given task in a query must run before cancelling it and the minimum number of output rows for a task in that query.

例如,如果想要为每个任务生成大量的行,则可以将 minTimeSecs 设置为较高的值。For example, you can set minTimeSecs to a higher value if you want to give it a chance to produce a large number of rows per task. 同样,如果只想在查询中的任务产生 1000 万行之后才停止查询,可以将 spark.databricks.queryWatchdog.minOutputRows 设置为 1000 万。Likewise, you can set spark.databricks.queryWatchdog.minOutputRows to ten million if you want to stop a query only after a task in that query has produced ten million rows. 如果超过输入/输出比率,则查询成功。Anything less and the query succeeds, even if the output/input ratio was exceeded.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

提示

如果在笔记本中配置查询监视器,则配置不会在群集重新启动时保留。If you configure Query Watchdog in a notebook, the configuration does not persist across cluster restarts. 若要为群集的所有用户配置查询监视器,建议你使用群集配置If you want to configure Query Watchdog for all users of a cluster, we recommend that you use a cluster configuration.

在大型数据集上检测查询Detect query on extremely large dataset

另一个典型的大型查询可能会从大表/数据集中扫描大量数据。Another typical large query may scan a large amount of data from big tables/datasets. 扫描操作可能会持续很长时间,并使群集资源饱和(即使读取大 Hive 表的元数据也会花费大量时间)。The scan operation may last for a long time and saturate cluster resources (even reading metadata of a big Hive table can take a significant amount of time). 你可以设置 maxHivePartitions 以防止从大 Hive 表中获取太多分区。You can set maxHivePartitions to prevent fetching too many partitions from a big Hive table. 同样,你还可以将 maxQueryTasks 设置为限制对超大型数据集的查询。Similarly, you can also set maxQueryTasks to limit queries on an extremely large dataset.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

你应该何时启用查询监视器?When should you enable Query Watchdog?

对于 SQL 分析师和数据科学家共享一个给定的群集,且管理员需要确保查询彼此“很好地运行”时,应该为即席分析群集启用查询监视器。Query Watchdog should be enabled for ad hoc analytics clusters where SQL analysts and data scientists are sharing a given cluster and an administrator needs to make sure that queries “play nicely” with one another.

你应该何时禁用查询监视器?When should you disable Query Watchdog?

一般来说,我们不建议急于取消在 ETL 场景中使用的查询,因为通常没有人在循环中更正错误。In general we do not advise eagerly cancelling queries used in an ETL scenario because there typically isn’t a human in the loop to correct the error. 我们建议你禁用除即席分析群集以外的所有查询监视器。We recommend that you disable Query Watchdog for all but ad hoc analytics clusters.