Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
本文提供有关优化 HDInsight 中 Apache Kafka 工作负载性能的一些建议。 重点是调整生成者、中转站和使用者配置。 有时,还需要调整操作系统设置,以便在处理繁重工作负荷时优化性能。 可以通过不同的方法来衡量性能,所应用的优化取决于业务需求。
体系结构概述
Kafka 主题用于组织记录。 生产者生产记录,而消费者消费它们。 生成者将记录发送到 Kafka 中转站,然后存储数据。 HDInsight 群集中的每个工作器节点都是 Kafka 代理。
主题将记录分配到多个代理。 使用记录时,每个分区最多可以使用一个使用者来实现数据的并行处理。
复制用于跨节点复制分区。 此分区可防止节点(代理服务器)故障。 副本组中的单个分区被指定为分区前导符。 使用 ZooKeeper 管理的状态将生成者流量路由到每个节点的领导。
确定场景
Apache Kafka 性能具有两个主要方面 - 吞吐量和延迟。 吞吐量是可以处理数据的最大速率。 更高的吞吐量更好。 延迟是存储或检索数据所需的时间。 延迟越低越好。 在吞吐量、延迟和应用程序基础结构成本之间找到适当的平衡可能很有挑战性。 性能要求应符合以下三种常见情况之一,具体取决于是否需要高吞吐量、低延迟或两者:
- 高吞吐量,低延迟。 此方案需要高吞吐量和低延迟(约 100 毫秒)。 这种类型的应用程序的示例是服务可用性监视。
- 高吞吐量、高延迟。 此方案需要高吞吐量(约 1.5 GBps),但可以容忍更高的延迟(< 250 毫秒)。 此类应用程序的一个示例是用于准实时进程的遥测数据引入,例如安全和入侵检测应用程序。
- 低吞吐量,低延迟。 此方案需要低延迟(< 10 毫秒)进行实时处理,但可以容忍较低的吞吐量。 此类应用程序的一个示例是联机拼写和语法检查。
生产者配置
以下部分重点介绍了一些最重要的泛型配置属性,以优化 Kafka 生成者的性能。 有关所有配置属性的详细说明,请参阅 Apache Kafka 文档,了解生成者配置。
批次大小
Apache Kafka 生成者将消息组(称为批处理)组合在一起,这些消息作为要存储在单个存储分区中的单元发送。 批大小表示在传输该组之前必须存在的字节数。
batch.size增加参数可以提高吞吐量,因为它减少了网络和 IO 请求的处理开销。 在轻负载下,增加的批大小可能会增加 Kafka 发送延迟,因为生成者等待批准备就绪。 在负载过大的情况下,建议增加批大小以提高吞吐量和延迟。
生产者需要确认
生产者需要的 acks 配置决定了写入请求被认为完成之前,分区领导者所需的确认数量。 此设置会影响数据可靠性,并且它可以接受0、1或-1值。
-1的价值意味着在写入完成之前,必须从所有副本接收到确认。 设置 acks = -1 可提供更严格的数据丢失保证,但也会导致更高的延迟和较低的吞吐量。 如果应用程序要求更高的吞吐量,请尝试设置 acks = 0 或 acks = 1。 请记住,不确认所有副本可以降低数据可靠性。
Compression
可以将 Kafka 生成者配置为在将消息发送到中转站之前对其进行压缩。 该 compression.type 设置指定要使用的压缩编解码器。 支持的压缩编解码器是“gzip”、“snappy”和“lz4”。压缩是有益的,如果磁盘容量存在限制,则应考虑压缩。
在两个常用的压缩编解码器中, gzipsnappy具有gzip更高的压缩率,这会导致磁盘使用率降低,代价是 CPU 负载较高。 编解码器 snappy 提供更少的压缩,CPU 开销更少。 可以根据中转站磁盘或生成者 CPU 限制确定要使用的编解码器。
gzip 可以按高于 5 倍 snappy的速度压缩数据。
数据压缩会增加可存储在磁盘上的记录数。 在生成者和中转站使用的压缩格式不匹配的情况下,它还可能会增加 CPU 开销。 因为必须先压缩数据,然后才能发送,然后在处理之前解压缩。
代理设置
以下部分重点介绍了优化 Kafka 代理性能的一些最重要的设置。 有关所有代理设置的详细说明,请参阅 Apache Kafka 有关代理配置的文档。
磁盘数目
存储磁盘的 IOPS(每秒输入/输出操作数)和每秒读/写字节数有限。 创建新分区时,Kafka 会将每个新分区存储在现有分区最少的磁盘上,以便在可用磁盘之间平衡这些分区。 尽管存储策略是,在处理每个磁盘上的数百个分区副本时,Kafka 可以轻松饱和可用的磁盘吞吐量。 此处的权衡在于吞吐量和成本。 如果应用程序需要更大的吞吐量,请为每个中转站创建具有更多托管磁盘的群集。 HDInsight 目前不支持将托管磁盘添加到正在运行的群集。 有关如何配置托管磁盘数的详细信息,请参阅 配置 Apache Kafka on HDInsight 的存储和可伸缩性。 了解增加群集中节点存储空间的成本影响。
主题和分区数
Kafka 生产者将数据写入主题。 Kafka 使用者从主题中读取。 主题与日志相关联,该日志是磁盘上的数据结构。 Kafka 将生产者的记录追加到主题日志的末尾。 主题日志包含分布在多个文件的多个分区。 这些文件又分布在多个 Kafka 群集节点中。 使用者按节奏从 Kafka 主题中读取,并且可以在主题日志中选取其位置(偏移量)。
每个 Kafka 分区都是系统上的日志文件,生成者线程可以同时写入多个日志。 同样,由于每个使用者线程从一个分区读取消息,因此也并行处理来自多个分区的消息。
增加分区密度(每个代理的分区数)会增加与元数据操作相关的开销,以及分区主节点与其追随者之间的每个分区请求/响应。 即使在没有数据传输的情况下,分区副本仍从主副本提取数据,这会导致网络发送和接收请求的额外处理。
对于 Apache Kafka 群集 2.1 和 2.4,如 HDInsight 中所述,我们建议每个代理最多有 2000 个分区,包括副本。 增加每个代理的分区数会降低吞吐量,并可能导致主题不可用。 有关 Kafka 分区支持的详细信息,请参阅 有关版本 1.1.0 中受支持分区数量的增加的官方 Apache Kafka 博客文章。 有关修改主题的详细信息,请参阅 Apache Kafka:修改主题。
副本数
较高的复制因子会导致分区领导者和追随者之间的额外请求。 因此,较高的复制因子消耗更多的磁盘和 CPU 来处理其他请求、增加写入延迟和降低吞吐量。
我们建议在 Azure HDInsight 中对 Kafka 使用至少 3 倍的副本。 大多数Azure区域都有三个容错域,但在只有两个容错域的区域中,用户应使用 4 倍复制。
有关复制的详细信息,请参阅 Apache Kafka:复制 和 Apache Kafka:增加复制因子。
使用者配置
以下部分重点介绍了一些重要的泛型配置,以优化 Kafka 使用者的性能。 有关所有配置的详细说明,请参阅 Apache Kafka 有关使用者配置的文档。
使用者数
最好将分区数等于使用者数。 如果消费者数量小于分区数量,则一些消费者将会从多个分区读取数据,从而增加了消费者的延迟。
如果使用者数大于分区数,则会浪费使用者资源,因为这些使用者处于空闲状态。
避免频繁的使用者重新平衡
消费者重新平衡是由以下因素触发的:分区所有权的变化(即消费者的扩展或缩减)、代理崩溃(因为代理是消费者组的组协调者)、消费者崩溃、添加新主题或新增分区。 在重新均衡期间,使用者无法使用,因此会增加延迟。
如果消费者可以在 session.timeout.ms 内向代理发送心跳信号,则认为消费者处于活动状态。 否则,使用者被视为死或失败。 这种延迟会导致消费者重新平衡。 降低消费者 session.timeout.ms 负荷,我们能更快地检测到这些故障。
如果session.timeout.ms太低,消费者可能会由于某些情况而遭遇重复和不必要的重新平衡,比如一批消息处理时间较长或者 JVM GC 暂停时间过长。 如果你有一个花费太多时间处理消息的使用者,可以通过增加使用者在获取更多记录 max.poll.interval.ms 之前空闲的时间上限,或者减少使用配置参数 max.poll.records返回的最大批大小来解决此问题。
批处理
与生成者一样,我们可以为使用者添加批处理。 可以通过更改配置 fetch.min.bytes来设置每个获取请求中消费者可接收的数据量。 此参数定义使用者的提取响应所需的最小字节数。 增加此值可减少向中转站发出的提取请求数,从而减少额外的开销。 默认情况下,此值为 1。 同样,还有另一个配置 fetch.max.wait.ms。 如果提取请求的消息数量小于fetch.min.bytes指定的大小,它将根据此配置等待至fetch.max.wait.ms设定的等待时间到期。
注释
在少数情况下,使用者处理消息失败时,速度显得很慢。 如果在发生异常后未提交偏移量,消费者将卡在一个特定偏移量的无限循环中,并且不会向前移动,从而增加消费者端的延迟。
针对繁重工作负荷调优 Linux OS
内存映射
vm.max_map_count 定义了一个进程可以拥有的最大 mmap 的数量。 默认情况下,在 HDInsight Apache Kafka 群集 linux VM 上,值为 65535。
在 Apache Kafka 中,每个日志段都需要一对索引文件和时间索引文件,其中每个文件都使用一个 mmap。 换句话说,每个日志段使用两个 mmap。 因此,如果每个分区托管单个日志段,则至少需要两个 mmap。 每个分区的日志段数因 段大小、负载强度、保留策略、滚动周期 和通常多于一个而有所不同。 Mmap value = 2*((partition size)/(segment size))*(partitions)
如果需要的 mmap 值超过 vm.max_map_count,中转站将引发 “映射失败” 异常。
若要避免出现此异常,请使用以下命令检查 vm 中 mmap 的大小,并在每个工作器节点上根据需要增加大小。
# command to find number of index files:
find . -name '*index' | wc -l
# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l
# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>
# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p
注释
请谨慎设置这个值过高,因为这会占用 VM 的内存。 JVM 允许在内存映射上使用的内存量由设置 MaxDirectMemory决定。 默认值为 64MB。 有可能达到这个结果。 可以通过 Ambari 将 -XX:MaxDirectMemorySize=amount of memory used 添加到 JVM 设置中来增加该值。 请注意节点上使用的内存量,以及是否有足够的可用 RAM 来支持系统需求。