最佳做法:使用 Kinesis 的结构化流式处理
本文介绍将 Kinesis 用作流式源与 Delta Lake 和 Apache Spark 结构化流式处理结合使用时的最佳做法。
Amazon Kinesis 数据流 (KDS) 是一种可大规模缩放且持久的实时数据流式处理服务。 KDS 每秒从数十万个源(例如网站点击流、数据库事件流、金融交易、社交媒体源、IT 日志和位置跟踪事件)持续捕获千兆字节的数据。
KDS 是 AWS 上流式处理数据服务的常用选择,因为它易于使用,具有无服务器设置。 Kinesis 数据流由单个吞吐量单位(称为分片)组成,根据分片小时数和 PUT 有效负载单位计费。 每个分片的估计引入容量为 1000 条记录/秒或 1MB/秒,输出速率为 2MB/秒。
在 KDS 中收集数据后,可以将 Apache Spark 结构化流式处理与 Delta Lake 的深度集成用于日志分析、点击流分析和实时指标等应用程序。 可以持续处理数据并将其存储到 Delta 表中。 下图描述了这些用例的典型体系结构:
Databricks Kinesis 结构化流式处理源
Databricks Runtime 包含一个现成的 Kinesis 源。 这是 KDS 的专有连接器,未提供开放源代码。 此连接器不基于 Kinesis 客户端库 (KCL)。 Kinesis 源体系结构如图所示:
关键技术注意事项和最佳做法
本部分包括有关将 Kinesis 与 Delta Lake 结合使用的最佳做法和故障排除信息。
优化预提取
Kinesis 源在后台线程中运行 Spark 作业,以定期预提取 Kinesis 数据并将其缓存在 Spark 执行程序的内存中。 流式处理查询在每个预提取步骤完成后处理缓存数据,使数据可用于处理。 预提取步骤显着影响观察到的端到端延迟和吞吐量。 可以使用本部分中介绍的选项控制性能。
shardsPerTask
配置参数的默认设置为 5。 然而,大规模处理时,可能需要大量的 CPU 核心,因此设置为 10 可能是一个很好的起点。 然后,根据流式处理工作负载和数据量的复杂性,可以根据群集的 Ganglia 指标(CPU、内存、网络等)调整此值。 例如,受 CPU 约束的群集可能需要一个较小的值,用更多的核心来补偿。
若要优化最小查询延迟和最大资源使用率,请使用以下计算:
total number of CPU cores in the cluster (across all executors)
>= total number of Kinesis shards
/ shardsPerTask
。
下表介绍了用于确定每次从 Kinesis 预提取时读取的数据量的参数。
选项 | 值 | 默认 | 说明 |
---|---|---|---|
maxRecordsPerFetch |
整数 | 10000 | 每个 getRecords API 调用要提取的记录数。 |
shardFetchInterval |
持续时间字符串(2m = 2 分钟) | 1 秒 | 更新分片列表之前的等待时间(系统通过此项了解流是否已重设大小)。 |
minFetchPeriod |
持续时间字符串 | 400 毫秒 | 连续提取尝试之间的等待时间。 此设置有助于避免 Kinesis 限制。 200 毫秒是最小值,因为 Kinesis 服务限制为每秒提取 5 次。 |
maxFetchRate |
小数 | 1.0 | 每个分片的最大预提取数据速率(以 MB/秒为单位)。此速率限制可提取并避免 Kinesis 限制。 Kinesis 允许的最大速率为 2.0 MB/秒。 |
maxFetchDuration |
持续时间字符串 | 10 秒 | 在使预提取的新数据可供处理之前进行缓冲的时间 |
fetchBufferSize |
字节字符串 | 20GB | 为下一次触发缓冲的数据量。 这是一个软限制,因为它用作停止条件,因此实际可能会缓冲更多数据。 |
shardsPerTask |
Integer | 5 | 每个任务并行预提取的分片数。 |
重要
minFetchPeriod
可以对 Kinesis 分片创建多个 GetRecords API 调用,直到它达到 ReadProvisionedThroughputExceeded。 如果发生异常,这并不表示存在问题,因为连接器会最大限度地利用 Kinesis 分片。
避免因速率限制错误过多导致减速
每次遇到速率限制错误时,连接器都会将从 Kinesis 读取的数据量减少一半,在日志中记录此事件并显示消息:"Hit rate limit. Sleeping for 5 seconds."
当流掉队时,通常会出现这些错误,但在赶上之后,应该不再看到这些错误。 如果仍存在错误,可能需要从 Kinesis 端进行优化(通过增加容量)或调整预提取选项。
数据过多导致写入磁盘
如果 Kinesis 流突然出现峰值,则分配的缓冲区容量可能会填满,并且缓冲区的清空速度不够快,无法添加新数据。
在这种情况下,Spark 会将块从缓冲区溢出到磁盘并减慢处理速度,这会影响流性能。 此事件显示在日志中,其中包含如下所示的消息:
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
若要解决此问题,请尝试增加群集内存容量(添加更多节点或增加每个节点的内存),或调整配置参数 fetchBufferSize
。
启用 S3 VPC 终结点
为确保所有 S3 流量都在 AWS 网络上路由,Databricks 建议启用 S3 VPC 终结点。
挂起的 S3 写入任务
挂起的任务可能会导致流式批处理持续时间过长,这可能导致流无法跟上输入。 在这种情况下,Databricks 建议启用 Spark 推测。 为确保任务不会过于激进地终止,请仔细调整此设置的分位数和乘数。 一个好的起点是将 spark.speculation.multiplier
设置为 3
,将 spark.speculation.quantile
设置为 0.95
。
由于 S3 写入速度慢,使用 RocksDB 管理状态时出现延迟问题
在流式处理查询中维护有状态操作时的一个常见场景是大型垃圾回收暂停,这反过来会造成延迟并导致批处理执行时间延长。 这种情况通常发生在维护数百万个密钥状态时。 在这些情况下,请勿在 JVM 内存中维护状态,而是考虑使用 RocksDB 作为本机内存或磁盘中的状态存储。 状态更改会自动传播到结构化流检查点。 但是,当 RocksDB 将这些检查点写入 S3 时,由于潜在的 S3 限制,可能会观察到延迟。 可尝试减少 spark.sql.shuffle.partitions
(默认为 200)以尽量减少写入的文件数。 还可以尝试调整分段上传阈值(spark.hadoop.fs.s3a.multipart.size
,默认为 1048576000 字节)以减少并发 S3 写入数量。
监视流式处理应用程序
要监视流式处理应用程序,Databricks 建议使用 Spark 的流式处理查询侦听器实现。
可观察指标是可以在查询(数据帧)中定义的命名任意聚合函数。 在数据帧的执行达到完成点(即,完成批处理查询或达到流式处理循环)后,会发出一个命名事件,其中包含自上一个完成点以来处理的数据的指标。
可以通过将侦听器附加到 Spark 会话来观察这些指标。 侦听器取决于执行模式:
批处理模式:使用
QueryExecutionListener
。查询完成时调用
QueryExecutionListener
。 使用QueryExecution.observedMetrics
映射访问指标。流式处理或微批处理:使用
StreamingQueryListener
。流式处理查询完成某个循环时调用
StreamingQueryListener
。 使用StreamingQueryProgress.observedMetrics
映射访问指标。 Azure Databricks 不支持连续执行流式处理。
例如:
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
还可以通过 UI 监视指标。 如果使用的是 Databricks Runtime 7.0 或更高版本,请使用 Spark UI 中的“流式处理”选项卡。
删除和重新创建流
如果删除并重新创建流,则必须使用新的检查点位置和目录。
重新分片
结构化流支持重新分片。 在这种情况下,增加分片数就足够了。 无需切换流或创建临时流来转移流量。