Compartir a través de

Apache Kafka HDInsight 群集的性能优化

本文提供有关优化 HDInsight 中 Apache Kafka 工作负荷的性能的一些建议, 本文重点介绍如何调整生成者、中介和使用者配置。 有时,还需要调整 OS 设置以优化繁重工作负载的性能。 可通过不同的方法来衡量性能,运用的优化方法取决于业务需求。

体系结构概述

Kafka 主题用于对记录进行组织。 生成者生成记录,使用者使用这些记录。 生成者将记录发送到 Kafka 代理,后者存储数据。 HDInsight 群集中的每个辅助角色节点都是一个 Kafka 中转站。

主题跨代理对记录进行分区。 在使用记录时,每个分区最多可使用一个使用者来实现数据并行处理。

复制用于在节点之间复制分区。 此分区可以防止节点(代理)发生服务中断。 将副本组之间的单个分区指定为分区领先者。 生成方流量将根据由 ZooKeeper 管理的状态路由到每个节点的前导者。

确定方案

Apache Kafka 性能体现在两个主要方面 - 吞吐量和延迟。 吞吐量是指数据的最大处理速率。 吞吐量越高越好。 延迟是指存储或检索数据所花费的时间。 延迟越低越好。 在吞吐量、延迟和应用基础结构的开销方面找到适当的平衡可能会有难度。 根据追求的是高吞吐量、低延迟还是此两者,性能要求应符合以下三种常见情况中的一种:

  • 高吞吐量,低延迟。 此方案要求同时满足高吞吐量和低延迟(大约 100 毫秒)。 服务可用性监视就是这种应用场景的一个例子。
  • 高吞吐量,高延迟。 此方案要求满足高吞吐量(大约 1.5 GBps),但可以容许较高的延迟(< 250 毫秒)。 这种应用场景的一个例子是引入遥测数据进行近实时的处理,例如安全与入侵检测应用程序。
  • 低吞吐量,低延迟。 此方案要求提供低延迟(< 10 毫秒)以完成实时处理,但可以容许较低的吞吐量。 在线拼写和语法检查就是这种应用场景的一个例子。

生成者配置

以下部分重点介绍了一些用于优化 Kafka 生成者性能的最重要的通用配置属性。 有关所有配置属性的详细说明,请参阅有关生成者配置的 Apache Kafka 文档

批大小

Apache Kafka 生成者将作为一个单元发送的消息组(称为批)汇编到一起,以将其存储在单个存储分区中。 批大小表示在传输该组之前必须达到的字节数。 增大 batch.size 参数可以提高吞吐量,因为这可以降低网络和 IO 请求的处理开销。 负载较轻时,增大批大小可能会增大 Kafka 发送延迟,因为生成者需要等待某个批准备就绪。 负载较重时,建议增大批大小以改善吞吐量和延迟。

生成者要求的确认

生成者所需的 acks 配置确定在将某个写入请求视为已完成之前,分区领先者所需的确认数目。 此设置会影响数据可靠性,其值为 01-1。 值 -1 表示必须收到所有副本的确认,才能将写入请求视为已完成。 设置 acks = -1 能够更可靠地保证数据不会丢失,但同时也会导致延迟增大,吞吐量降低。 如果应用场景要求提供较高的吞吐量,请尝试设置 acks = 0acks = 1。 请记住,不确认所有副本可能会降低数据可靠性。

压缩

可将 Kafka 生成者配置为先压缩消息,然后再将消息发送到代理。 compression.type 设置指定要使用的压缩编解码器。 受支持的压缩编解码器为“gzip”、“snappy”和“lz4”。如果磁盘容量存在限制,则压缩是有利的做法,应予以考虑。

gzipsnappy 这两个常用的压缩编解码器中,gzip 的压缩率更高,它可以降低磁盘用量,但代价是使 CPU 负载升高。 snappy 编解码器的压缩率更低,但造成的 CPU 开销更低。 可以根据代理磁盘或生成者的 CPU 限制来决定使用哪个编解码器。 gzip 数据压缩率比 snappy 高 5 倍。

数据压缩会增加磁盘中可存储的记录数。 如果生成者与代理使用的压缩格式不匹配,则数据压缩也会增大 CPU 开销。 因为数据在发送之前必须经过压缩,并在处理之前经过解压缩。

代理设置

以下部分重点介绍了一些用于优化 Kafka 代理性能的最重要设置。 有关所有中转站设置的详细说明,请参阅有关中转站配置的 Apache Kafka 文档

磁盘数目

存储磁盘的 IOPS(每秒输入/输出操作次数)和每秒读/写字节数有限制。 创建新分区时,Kafka 会将每个新分区存储在现有分区最少的磁盘上,以便在可用磁盘之间平衡分区的数目。 尽管有存储策略进行调节,但在处理每个磁盘上的数百个分区副本时,Kafka 很容易就会使可用磁盘吞吐量达到饱和。 此时,需要在吞吐量与成本之间进行取舍。 如果应用场景需要更大的吞吐量,请创建一个可为每个代理提供更多托管磁盘的群集。 HDInsight 目前不支持将托管磁盘添加到正在运行的群集。 有关如何配置托管磁盘数目的详细信息,请参阅为 HDInsight 上的 Apache Kafka 配置存储和可伸缩性。 了解为群集中的节点增加存储空间所造成的成本影响。

主题和分区的数目

Kafka 生成者将写入主题。 Kafka 使用者读取主题。 主题与日志相关联,该日志是磁盘上的数据结构。 Kafka 将生成者中的记录追加到主题日志的末尾。 主题日志包括分散在多个文件之间的多个分区。 而这些文件又分散在多个 Kafka 群集节点之间。 使用者可以按照自己的节奏从 Kafka 主题中读取内容,并可以在主题日志中选择自己的位置(偏移)。

每个 Kafka 分区是在系统上的一个日志文件,生成者线程可以同时写入到多个日志。 同样,由于每个使用者线程从一个分区读取消息,因此也能并行处理从多个分区使用消息的操作。

提高分区密度(每个代理的分区数)会增大与元数据操作以及每个分区领先者及其后继者之间的分区请求/响应相关的开销。 即使不存在流动的数据,分区副本也仍会从领先者提取数据,导致需要通过网络额外处理发送和接收请求。

对于 HDInsight 中的 Apache Kafka 群集 2.1 和 2.4 以及之前所述,建议最多为每个代理提供 2000 个分区(包括副本)。 增加每个代理的分区数会降低吞吐量,并可能导致主题不可用。 有关 Kafka 分区支持的详细信息,请参阅有关在版本 1.1.0 中增加受支持分区数目的官方 Apache Kafka 博客文章。 有关修改主题的详细信息,请参阅 Apache Kafka:修改主题

副本数

较高的复制因子会导致分区领先者与后继者之间的请求数增加。 因而,较高的复制因子会消耗更多的磁盘和 CPU 来处理额外的请求,并增大写入延迟,降低吞吐量。

我们建议对 Azure HDInsight 中的 Kafka 至少使用 3 倍的复制因子。 大部分 Azure 区域有三个容错域。在只有两个容错域的区域中,用户应使用 4 倍复制因子。

有关复制的详细信息,请参阅 Apache Kafka:复制Apache Kafka:增大复制因子

使用者配置

以下部分重点介绍了一些重要的通用配置,以优化 Kafka 使用者的性能。 有关所有配置的详细说明,请参阅有关使用者配置的 Apache Kafka 文档

使用者数量

良好的做法是让分区数量等于使用者数量。 如果使用者数小于分区数,少数使用者会从多个分区读取内容,从而增加使用者延迟。

如果使用者数大于分区数,会浪费使用者资源,因为这些使用者处于空闲状态。

避免频繁进行使用者重新平衡

发生分区所有权更改(即使用者横向扩展或纵向缩减)、中介崩溃(因为中介是使用者组的组协调者)、使用者崩溃、添加新主题或添加新分区时,会触发使用者重新平衡。 在重新平衡期间,使用者无法使用,因此会增大延迟。

如果使用者可以在 session.timeout.ms 内向中介发送检测信号,则认为使用者是存活的。 否则,使用者被视为死或失败。 这种延迟会导致使用者重新平衡。 使用者session.timeout.ms越低,我们就能越快地检测到这些故障。

如果 session.timeout.ms 太低,使用者可能会由于某些情况(例如,处理一批消息花费了较长时间,或 JVM GC 暂停时间太长)而遇到重复的不必要重新平衡。 如果你的某个使用者花费了过多时间处理消息,你可以通过以下方法解决此问题:使用 max.poll.interval.ms 提高使用者在获取更多记录之前可以保持空闲状态的时长上限,或使用配置参数 max.poll.records 减小返回的最大批大小。

批处理

与生成者一样,可为使用者添加批处理。 可以通过更改配置 fetch.min.bytes,来配置使用者在每个提取请求中可获取的数据量。 该参数定义了使用者提取响应中预期的最小字节数。 增加此值可减少向中转站发出的提取请求数,从而减少额外开销。 默认情况下,此值为 1。 类似地,还有另一个配置 fetch.max.wait.ms。 如果提取请求没有足够的消息(根据fetch.min.bytes的大小),它会一直等待,直至基于fetch.max.wait.ms配置的等待时间过期。

注意

在少数情况下,当使用者无法处理消息时,它看起来就很缓慢。 如果在发生异常后你不提交偏移量,使用者将停滞在无限循环中的特定偏移量而不会前进,从而增大使用者端的滞后时间。

繁重工作负载的 Linux OS 优化

内存映射

vm.max_map_count 定义了进程可以拥有的最大 mmap 数量。 默认情况下,在 HDInsight Apache Kafka 群集 Linux VM 上,该值为 65535。

在 Apache Kafka 中,每个日志段都需要一对索引/timeindex 文件,其中每个文件都使用一个 mmap。 换句话说,每个日志段都使用两个 mmap。 因此,如果每个分区都托管单个日志段,至少需要两个 mmap。 每个分区的日志段数取决于段大小、负载强度、保留策略和滚动期,该数量通常大于 1。 Mmap value = 2*((partition size)/(segment size))*(partitions)

如果所需的 mmap 值超过 vm.max_map_count,中介将引发“映射失败”异常。

为避免此异常,请使用以下命令检查 VM 中 mmap 的大小,并根据需要在每个工作器节点上增大该大小。

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

注意

请注意不要将此值设置得过高,因为这会占用 VM 上的内存。 通过设置 MaxDirectMemory 来确定 JVM 可以在内存映射中使用的内存量。 默认值为 64MB。 有可能会达到此限制。 可以通过 Ambari 将 -XX:MaxDirectMemorySize=amount of memory used 添加到 JVM 设置来增大此值。 需要知道节点上正在使用的内存量,以及是否有足够的可用 RAM 来支持增大此值。

后续步骤