如何使用 Apache Spark 指标How to use Apache Spark metrics

本文提供了一个示例,说明如何使用 Spark 可配置指标系统监视 Apache Spark 组件。This article gives an example of how to monitor Apache Spark components using the Spark configurable metrics system. 具体而言,它演示了如何设置新源和启用接收器。Specifically, it shows how to set a new source and enable a sink.

有关可用于指标收集的 Spark 组件的详细信息(包括直接受支持的接收器),请访问上面的文档链接。For detailed information about the Spark components available for metrics collection, including sinks supported out of the box, follow the documentation link above.

备注

还有其他几种收集指标的方法,可以深入了解 Spark 作业的执行方式,本文也没有介绍这些方法:There are several other ways to collect metrics to get insight into how a Spark job is performing, which are also not covered in this article:

  • SparkStatusTracker(API):监视作业、阶段或任务进度SparkStatusTracker (Source, API): monitor job, stage, or task progress
  • StreamingQueryListener(API):截获流式处理事件StreamingQueryListener (Source, API): intercept streaming events
  • SparkListener:截获 Spark 计划程序中的事件SparkListener: intercept events from Spark scheduler

有关使用其他第三方工具监视 Azure Databricks 中的 Spark 作业的信息,请参阅指标For information about using other third-party tools to monitor Spark jobs in Azure Databricks, see Metrics.

此指标收集系统的工作原理是什么?How does this metrics collection system work? 实例化后,每个执行程序会创建一个到驱动程序的连接以传递指标。Upon instantiation, each executor creates a connection to the driver to pass the metrics.

第一步是编写一个扩展 Source 特征的类:The first step is to write a class that extends the Source trait:

class MySource extends Source {
  override val sourceName: String = "MySource"

  override val metricRegistry: MetricRegistry = new MetricRegistry

  val FOO: Histogram = metricRegistry.histogram(MetricRegistry.name("fooHistory"))
  val FOO_COUNTER: Counter = metricRegistry.counter(MetricRegistry.name("fooCounter"))
}

下一步是启用接收器。The next step is to enable the sink. 在此示例中,指标输出到控制台:In this example, the metrics are printed to the console:

val spark: SparkSession = SparkSession
    .builder
    .master("local[*]")
    .appName("MySourceDemo")
    .config("spark.driver.host", "localhost")
    .config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink")
.getOrCreate()

备注

要将指标汇聚到 Prometheus,可以使用此第三方库:https://github.com/banzaicloud/spark-metricsTo sink metrics to Prometheus, you can use this third-party library: https://github.com/banzaicloud/spark-metrics.

最后一步是实例化源并将其注册到 SparkEnv:The last step is to instantiate the source and register it with SparkEnv:

val source: MySource = new MySource
SparkEnv.get.metricsSystem.registerSource(source)

可以在 https://github.com/newroyker/meter 上查看完整的、可生成的示例。You can view a complete, buildable example at https://github.com/newroyker/meter.