如何利用 Spark 侦听器探索 Apache Spark 指标How to explore Apache Spark metrics with Spark listeners

Apache Spark 提供了几个有用的内部侦听器,用于跟踪有关任务和作业的指标。Apache Spark provides several useful internal listeners that track metrics about tasks and jobs. 例如,在开发周期中,这些指标可以帮助你了解何时以及为什么任务需要很长时间才能完成。During the development cycle, for example, these metrics can help you to understand when and why a task takes a long time to finish. 当然,你可以利用 Spark UI 或历史记录 UI 来查看每个任务和阶段的信息,但这有一些缺点。Of course, you can leverage the Spark UI or History UI to see information for each task and stage, but there are some downsides. 例如,你不能并行比较两个 Spark 作业的统计信息,并且对于大型 Spark 作业,加载 Spark 历史记录 UI 可能需要很长的时间。For instance, you can’t compare the statistics for two Spark jobs side by side, and the Spark History UI can take a long time to load for large Spark jobs.

可提取 Spark 内部类生成的指标,并将其作为表或 DataFrame 保留到磁盘中。You can extract the metrics generated by Spark internal classes and persist them to disk as a table or a DataFrame. 然后,可像查询任何其他数据科学表一样查询 DataFrameThen you can query the DataFrame just like any other data science table.

可使用此 SparkTaskMetrics 包,了解如何使用 Spark 侦听器从任务和作业中提取指标。You can use this SparkTaskMetrics package to explore how to use Spark listeners to extract metrics from tasks and jobs.

生成 Spark Metrics 包Build the Spark Metrics package

使用以下命令以生成该包。Use the following command to build the package.

sbt package

收集指标Gather metrics

导入 TaskMetricsExplorerImport TaskMetricsExplorer. 创建查询 sql("""SELECT * FROM nested_data""").show(false) 并将其传递到 runAndMeasure 中。Create the query sql("""SELECT * FROM nested_data""").show(false) and pass it into runAndMeasure. 查询应至少包含一个 Spark 操作,以触发 Spark 作业。The query should include at least one Spark action in order to trigger a Spark job. 在执行 Spark 作业之前,Spark 不会生成任何指标。Spark does not generate any metrics until a Spark job is executed.

例如:For example:

import com.databricks.TaskMetricsExplorer

val t = new TaskMetricsExplorer(spark)
sql("""CREATE OR REPLACE TEMPORARY VIEW nested_data AS
       SELECT id AS key,
       ARRAY(CAST(RAND(1) * 100 AS INT), CAST(RAND(2) * 100 AS INT), CAST(RAND(3) * 100 AS INT), CAST(RAND(4) * 100 AS INT), CAST(RAND(5) * 100 AS INT)) AS values,
       ARRAY(ARRAY(CAST(RAND(1) * 100 AS INT), CAST(RAND(2) * 100 AS INT)), ARRAY(CAST(RAND(3) * 100 AS INT), CAST(RAND(4) * 100 AS INT), CAST(RAND(5) * 100 AS INT))) AS nested_values
       FROM range(5)""")
val query = sql("""SELECT * FROM nested_data""").show(false)
val res = t.runAndMeasure(query)

runAndMeasure 方法运行命令,并使用 Spark 侦听器获取任务的内部指标。The runAndMeasure method runs the command and gets the task’s internal metrics using a Spark listener. 然后,它运行查询并返回结果:It then runs the query and returns the result:

+---+-------------------+-----------------------+
|key|values             |nested_values          |
+---+-------------------+-----------------------+
|0  |[26, 11, 66, 8, 47]|[[26, 11], [66, 8, 47]]|
|1  |[66, 8, 47, 91, 6] |[[66, 8], [47, 91, 6]] |
|2  |[8, 47, 91, 6, 70] |[[8, 47], [91, 6, 70]] |
|3  |[91, 6, 70, 41, 19]|[[91, 6], [70, 41, 19]]|
|4  |[6, 70, 41, 19, 12]|[[6, 70], [41, 19, 12]]|
+---+-------------------+-----------------------+

任务指标信息保存在 DataFrame中。The task metrics information is saved in a DataFrame. 可通过以下命令来显示它:You can display it with this command:

res.select($"stageId", $"taskType", $"taskLocality", $"executorRunTime", $"duration", $"executorId", $"host", $"jvmGCTime").show(false)

随即会显示:Then you will get:

+-------+----------+-------------+---------------+--------+----------+---------+---------+
|stageId|taskType  |taskLocality |executorRunTime|duration|executorId| host    |jvmGCTime|
+-------+----------+-------------+---------------+--------+----------+---------+---------+
|3      |ResultTask|PROCESS_LOCAL|2              |9       |driver    |localhost|0        |
|4      |ResultTask|PROCESS_LOCAL|3              |11      |driver    |localhost|0        |
|4      |ResultTask|PROCESS_LOCAL|3              |16      |driver    |localhost|0        |
|4      |ResultTask|PROCESS_LOCAL|2              |20      |driver    |localhost|0        |
|4      |ResultTask|PROCESS_LOCAL|4              |22      |driver    |localhost|0        |
|5      |ResultTask|PROCESS_LOCAL|2              |12      |driver    |localhost|0        |
|5      |ResultTask|PROCESS_LOCAL|3              |17      |driver    |localhost|0        |
|5      |ResultTask|PROCESS_LOCAL|7              |21      |driver    |localhost|0        |
+-------+----------+-------------+---------------+--------+----------+---------+---------+

若要查看所有可用的指标名称和数据类型,则显示 res DataFrame 的架构:To view all available metrics names and data types, display the schema of the res DataFrame:

res.schema.treeString
root
 |-- stageId: integer (nullable = false)
 |-- stageAttemptId: integer (nullable = false)
 |-- taskType: string (nullable = true)
 |-- index: long (nullable = false)
 |-- taskId: long (nullable = false)
 |-- attemptNumber: integer (nullable = false)
 |-- launchTime: long (nullable = false)
 |-- finishTime: long (nullable = false)
 |-- duration: long (nullable = false)
 |-- schedulerDelay: long (nullable = false)
 |-- executorId: string (nullable = true)
 |-- host: string (nullable = true)
 |-- taskLocality: string (nullable = true)
 |-- speculative: boolean (nullable = false)
 |-- gettingResultTime: long (nullable = false)
 |-- successful: boolean (nullable = false)
 |-- executorRunTime: long (nullable = false)
 |-- executorCpuTime: long (nullable = false)
 |-- executorDeserializeTime: long (nullable = false)
 |-- executorDeserializeCpuTime: long (nullable = false)
 |-- resultSerializationTime: long (nullable = false)
 |-- jvmGCTime: long (nullable = false)
 |-- resultSize: long (nullable = false)
 |-- numUpdatedBlockStatuses: integer (nullable = false)
 |-- diskBytesSpilled: long (nullable = false)
 |-- memoryBytesSpilled: long (nullable = false)
 |-- peakExecutionMemory: long (nullable = false)
 |-- recordsRead: long (nullable = false)
 |-- bytesRead: long (nullable = false)
 |-- recordsWritten: long (nullable = false)
 |-- bytesWritten: long (nullable = false)
 |-- shuffleFetchWaitTime: long (nullable = false)
 |-- shuffleTotalBytesRead: long (nullable = false)
 |-- shuffleTotalBlocksFetched: long (nullable = false)
 |-- shuffleLocalBlocksFetched: long (nullable = false)
 |-- shuffleRemoteBlocksFetched: long (nullable = false)
 |-- shuffleWriteTime: long (nullable = false)
 |-- shuffleBytesWritten: long (nullable = false)
 |-- shuffleRecordsWritten: long (nullable = false)
 |-- errorMessage: string (nullable = true)