如何在数据迁移、复制和用例中使用 Kafka MirrorMaker 2.0

MirrorMaker 2.0 (MM2) 旨在更轻松地将主题从一个 Kafka 群集镜像或复制到另一个群集。 它使用 Kafka Connect 框架来简化配置和缩放。 它动态检测主题的更改,并确保源和目标主题属性同步,包括偏移和分区。

本文介绍如何在数据迁移/复制和用例中使用 Kafka MirrorMaker 2.0。

先决条件

  • 环境至少有两个 HDI Kafka 群集。
  • Kafka 版本高于 2.4 (HDI 4.0)
  • 源群集应具有数据点和主题来测试 MirrorMaker 2.0 复制过程的各种功能

用例

模拟 MirrorMaker 2.0 以在 HDInsight 中的两个 Kafka 群集之间复制数据点/偏移。 这样同样可用于像在两个或更多 Kafka 群集之间进行必需的数据复制这样的场景,例如灾难恢复、云适应、异地复制、数据隔离和数据聚合。

使用 MirrorMaker 2.0 进行偏移复制

MM2 内部

MirrorMaker 2.0 工具由不同的连接器组成。 这些连接器是标准 Kafka Connect 连接器,可以在独立模式或分布式模式下直接与 Kafka Connect 配合使用。

代理设置过程的摘要如下:

MirrorSourceConnector:

  1. 复制单个源群集的远程主题、主题 ACL 和配置。
  2. 向内部主题发出偏移同步。

MirrorSinkConnector:

  1. 使用主群集并将主题复制到单个目标群集。

MirrorCheckpointConnector:

  1. 使用 offset-syncsr。
  2. 发出检查点以启用故障转移点。

MirrorHeartBeatConnector:

  1. 向远程群集发出检测信号,从而监视复制过程。

部署

  1. 与 Kafka 库捆绑的 connect-mirror-maker.sh 脚本实现分布式 MM2 群集,该群集根据配置文件在内部管理 Connect 辅助角色。 在内部,MirrorMaker 驱动程序创建和处理每个连接器的配对 - MirrorSourceConnector、MirrorSinkConnector、MirrorCheckpoint 连接器和 MirrorHeartbeatConnector。

  2. 启动 MirrorMaker 2.0。

    ./bin/connect-mirror-maker.sh ./config/mirror-maker.properties    
    

备注

对于已启用 Kerberos 的群集,JAAS 配置必须导出到 KAFKA_OPTS,或必需在 MM2 配置文件中指定。

export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas.conf>"    

示例 MirrorMaker 2.0 配置文件

# specify any number of cluster aliases
clusters = source, destination

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for example. "A_host1:9092, A_host2:9092, A_host3:9092"  and you can see the exact host name on Ambari > Hosts
source.bootstrap.servers = wn0-src-kafka.bx.internal.chinacloudapp.cn:9092,wn1-src-kafka.bx.internal.chinacloudapp.cn:9092,wn2-src-kafka.bx.internal.chinacloudapp.cn:9092
destination.bootstrap.servers = wn0-dest-kafka.bx.internal.chinacloudapp.cn:9092,wn1-dest-kafka.bx.internal.chinacloudapp.cn:9092,wn2-dest-kafka.bx.internal.chinacloudapp.cn:9092

# enable and configure individual replication flows
source->destination.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
source->destination.topics = toa.evehicles-latest-dev
groups=.*
topics.blacklist="*.internal,__.*"

# Setting replication factor of newly created remote topics
replication.factor=3

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1    

SSL 配置

如果设置需要 SSL 配置

destination.security.protocol=SASL_SSL
destination.ssl.truststore.password=<password>
destination.ssl.truststore.location=/path/to/kafka.server.truststore.jks
#keystore location in case client.auth is set to required
destination.ssl.keystore.password=<password> 
destination.ssl.keystore.location=/path/to/kafka.server.keystore.jks
destination.sasl.mechanism=GSSAPI

全局配置

properties 默认值 说明
name 必需 连接器的名称,例如“us-west->us-east”
topics 空字符串 要复制的主题的正则表达式,例如“topic1, topic2, topic3”。 还支持逗号分隔列表。
topics.blacklist “..internal, ..replica, __consumer_offsets”或类似形式 要从复制中排除的主题
groups 空字符串 要复制的组的正则表达式,例如“.*”
groups.blacklist 空字符串 要从复制中排除的组
source.cluster.alias 必需 被复制的群集的名称
target.cluster.alias 必需 下游 Kafka 群集的名称
source.cluster.bootstrap.servers 必需 要复制的上游群集
target.cluster.bootstrap.servers 必需 下游群集
sync.topic.configs.enabled true 是否监视源群集的配置更改
sync.topic.acls.enabled true 是否监视源群集 ACL 的更改
emit.heartbeats.enabled true 连接器应定期发出检测信号
emit.heartbeats.interval.seconds true 检测信号的频率
emit.checkpoints.enabled true 连接器应定期发出使用者偏移信息
emit.checkpoints.interval.seconds 5(秒) 检查点频率
refresh.topics.enabled true 连接器应定期检查是否有新使用者组
refresh.topics.interval.seconds 5(秒) 检查源群集是否有新使用者组的频率
refresh.groups.enabled true 连接器应定期检查是否有新使用者组
refresh.groups.interval.seconds 5(秒) 检查源群集是否有新使用者组的频率
readahead.queue.capacity 500(个记录) 让使用者领先于生产者的记录数
replication.policy.class org.apache.kafka.connect.mirror.DefaultReplicationPolicy 使用 LegacyReplicationPolicy 模拟旧版 MirrorMaker
heartbeats.topic.retention.ms 一天 首次创建检测信号主题时使用
checkpoints.topic.retention.ms 一天 首次创建检查点主题时使用
offset.syncs.topic.retention.ms 最大时长 首次创建偏移同步主题时使用
replication.factor two 创建远程主题时使用

常见问题

为什么在主题的源群集和目标群集帖子复制的最后偏移中会看到差异?

有可能是源主题的数据点已被清除,因此实际记录数将小于最后的偏移值。 这会导致源群集和目标群集帖子复制的最后偏移之间存在差异,因为复制始终从目标群集的偏移值 0 开始。

如果目标群集可能具有映射到数据点的其他偏移,则使用者在迁移时的行为如何?

MirrorMaker 2.0 MirrorCheckpointConnector 功能会自动在源群集上存储使用者组的使用者组偏移检查点。 每个检查点都包含源群集中每个组最后提交的偏移到目标群集中同等偏移的映射。 因此,迁移时,从目标群集上的同一主题开始使用的使用者能够从源群集上提交的最后一个偏移继续接收消息。

由于源别名以复制的所有主题为前缀,如何在目标群集中保留确切的主题名称?

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作。

为什么我们会看到在源和目标 Kafka 中创建的新内部主题?

MirrorMaker 2.0 内部主题由连接器创建,以跟踪复制过程、监视、偏移映射和检查点。

为什么 MirrorMaker 在目标群集中只创建两个主题副本,而源具有更多副本?

MirrorMaker 2 不会将主题的复制因子复制到目标群集。 这可以在 MM2 配置中通过指定所需的“replication.factor”数目来控制。 该复制因子的默认值为 2。

如何在 MirrorMaker 2.0 中使用自定义复制策略?

自定义复制策略可通过实现以下接口来创建。

/** Defines which topics are "remote topics", e.g. "us-west.topic1". */
public interface ReplicationPolicy {

    /** How to rename remote topics; generally should be like us-west.topic1. */
    String formatRemoteTopic(String sourceClusterAlias, String topic);

    /** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
        Returns null if not a remote topic.
    */
    String topicSource(String topic);

    /** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
        Topics may be replicated multiple hops, so the immediately upstream topic
        may itself be a remote topic.
        Returns null if not a remote topic.
    */
    String upstreamTopic(String topic);

    /** The name of the original source-topic, which may have been replicated multiple hops.
        Returns the topic if it is not a remote topic.
    */
    String originalTopic(String topic);

    /** Internal topics are never replicated. */
    boolean isInternalTopic(String topic);
}

需要将实现添加到 Kafka 类路径,以便对 MM2 属性中的 replication.policy.class 使用类引用。

后续步骤

什么是 Apache Kafka on HDInsight?

参考

MirrorMaker 2.0 更改 Apache Doc

HDI Kafka 的客户端证书设置

HDInsight Kafka

Apache Kafka 2.4 文档

将本地网络连接到 Azure