使用 MirrorMaker 2 在不同的 Azure HDInsight 版本之间迁移 Kafka 群集
了解如何使用 Apache Kafka 镜像功能将主题复制到辅助群集。 你可以运行镜像作为一个连续的进程,或者间歇性地将数据从一个群集迁移到另一个群集。
在本文中,我们使用镜像在两个 HDInsight 群集之间复制主题。 这两个群集位于不同数据中心内的不同虚拟网络中。
备注
- 可以使用镜像群集作为容错。
- 这仅在主群集 HDI Kafka 2.4.1、3.2.0 和辅助群集为 HDI Kafka 3.2.0 版本时有效。
- 如果主群集发生故障,辅助群集将无缝工作。
- 使用者组偏移将自动转换为辅助群集。
- 只需将主要群集使用者指向具有相同使用者组的辅助群集,使用者组就会从它退出主群集的偏移处开始使用。
- 唯一的区别在于备份群集中的主题名称将从 TOPIC_NAME 更改为 primary-cluster-name.TOPIC_NAME。
镜像的工作方式是使用 MirrorMaker2 工具,该工具属于 Apache Kafka。 MirrorMaker 使用来自主集群上的主题的记录,然后在辅助集群上创建一个本地副本。 MirrorMaker2 使用一个(或多个)使用者从主要群集读取记录,使用一个生成者将记录写入本地(辅助)群集。
最有用的灾难恢复镜像设置使用不同 Azure 区域中的 Kafka 群集。 为获得此结果,群集所在的虚拟网络将对等互连到一起。
主要和辅助群集在节点与分区数目方面可以不同,主题中的偏移也可以不同。 镜像维护用于分区的密钥值,因此会按密钥来保留记录顺序。
如果需要在不同网络中的 Kafka 群集之间执行镜像操作,还需要考虑以下注意事项:
网关:网络必须能够在 TCP/IP 级别通信。
服务器寻址:可选择使用群集节点的 IP 地址或完全限定的域名来寻址这些节点。
IP 地址:如果将 Kafka 群集配置为使用 IP 地址播发,可使用代理节点和 Zookeeper 节点的 IP 地址继续进行镜像设置。
域名称:如果未在 Kafka 群集上配置 IP 地址播发,则群集必须能够使用完全限定的域名 (FQDN) 相互连接。 这需要在每个网络中设置一台域名系统 (DNS) 服务器,并将其配置为向其他网络转发请求。 创建 Azure 虚拟网络时,必须指定自定义 DNS 服务器和服务器的 IP 地址,而不是使用网络提供的自动 DNS。 在创建虚拟网络之后,必须创建一个使用该 IP 地址的 Azure 虚拟机。 然后在上面安装和配置 DNS 软件。
重要
在将 HDInsight 安装到虚拟网络之前,需先创建和配置自定义 DNS 服务器。 HDInsight 不需要再进行其他配置,便可使用为虚拟网络配置的 DNS 服务器。
有关连接两个 Azure 虚拟网络的详细信息,请参阅配置连接。
此体系结构在不同的资源组和虚拟网络中配置两个群集:主要群集和辅助群集。
创建两个新的资源组:
资源组 位置 kafka-primary-rg 中国北部 kafka-secondary-rg 中国北部 在 kafka-primary-rg 中创建新的虚拟网络 kafka-primary-vnet。 保留默认设置。
在 kafka-secondary-rg 中创建新的虚拟网络 kafka-secondary-vnet,也保留默认设置。
备注
保持两个 VNet 的地址不重叠,否则 VNet 对等互连将不起作用。 示例:
- kafka-primary-vnet 的地址空间可以为 10.0.0.0
- kafka-secondary-vnet 的地址空间可以为 10.1.0.0
创建虚拟网络对等互连。 此步骤创建两个对等互连:一个从 kafka-primary-vnet 连接到 kafka-secondary-vnet,一个从 kafka-secondary-vnet 连接回到 kafka-primary-vnet。
选择“kafka-primary-vnet”虚拟网络。
在“设置”下选择“对等互连”。
选择“添加” 。
在“添加对等互连”屏幕上输入详细信息,如以下屏幕截图所示。
创建两个新的 Kafka 群集:
群集名称 HDInsight 版本 资源组 虚拟网络 存储帐户 primary-kafka-cluster 5.0 kafka-primary-rg kafka-primary-vnet kafkaprimarystorage secondary-kafka-cluster 5.1 kafka-secondary-rg kafka-secondary-vnet kafkasecondarystorage 备注
从现在起,我们将使用
primary-kafka-cluster
作为PRIMARYCLUSTER
,使用secondary-kafka-cluster
作为SECONDARYCLUSTER
。
使用
SECONDARYCLUSTER
的头节点运行 Mirror Maker 脚本。 然后,我们需要SECONDARYCLUSTER
的/etc/hosts
文件中 PRIMARYCLUSTER 的工作器节点的 IP 地址。连接到
PRIMARYCLUSTER
ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.cn
执行以下命令,获取工作节点 IP 和 FQDN
cat /etc/hosts
的条目复制这些条目,连接到
SECONDARYCLUSTER
并运行ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.cn`
编辑辅助群集的
/etc/hosts
文件并在此处添加这些条目。进行更改后,
SECONDARYCLUSTER
的/etc/hosts
文件外观将与给定的图像相似。保存并关闭该文件。
使用此命令创建主题并替换变量。
bash /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $KAFKAZKHOSTS --create --topic $TOPICNAME --partitions $NUM_PARTITIONS --replication-factor $REPLICATION_FACTOR
现在更改 MirrorMaker2 属性文件中的配置。
使用管理员权限执行以下命令
sudo su vi /etc/kafka/conf/connect-mirror-maker.properties
备注
本文包含一个对 Azure 不再使用的术语的引用。 在从软件中删除该术语后,我们会将其从本文中删除。
属性文件如下所示。
# specify any number of cluster aliases clusters = source, destination # connection information for each cluster # This is a comma separated host:port pairs for each cluster # for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts source.bootstrap.servers = wn0-src kafka.bx.internal.chinacloudapp.cn:9092,wn1-src-kafka.bx.internal.chinacloudapp.cn:9092,wn2-src-kafka.bx.internal.chinacloudapp.cn:9092 destination.bootstrap.servers = wn0-dest-kafka.bx.internal.chinacloudapp.cn:9092,wn1-dest-kafka.bx.internal.chinacloudapp.cn:9092,wn2-dest-kafka.bx.internal.chinacloudapp.cn:9092 # enable and configure individual replication flows source->destination.enabled = true # regex which defines which topics gets replicated. For eg "foo-.*" source->destination.topics = .* groups=.* topics.blacklist="*.internal,__.*" # Setting replication factor of newly created remote topics Replication.factor=3 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1
这里的源是你的
PRIMARYCLUSTER
,目标是你的SECONDARYCLUSTR
。 将每个位置的它替换为正确的名称并将source.bootstrap.servers
和destination.bootstrap.servers
替换为其各自工作器节点的正确 FQDN 或 IP。可以使用正则表达式指定要复制的主题及其配置。 通过将
replication.factor
参数设置为 3,可以确保 MirrorMaker 脚本创建的所有主题的复制因子为 3。将这些主题的复制因子从 1 增加到 3
checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1
备注
成为中转站级别所有主题的默认不同步副本的原因是 2。 保持复制因子=1 会在运行 mirrormaker2 时引发异常
需要启用自动创建主题功能,然后 Mirror Maker 脚本会在辅助群集中复制名称为
PRIMARYCLUSTER.TOPICNAME
且具有相同配置的主题。 保存文件,然后就可以使用配置了。若要在两侧镜像主题,例如
Primary to Secondary
和Secondary to Primary
(主动-主动),可以添加额外的配置destination->source.enabled=true destination->source.topics = .*
对于自动执行的使用者偏移同步,也需要启用复制并控制同步持续时间。 以下属性将每 30 秒同步偏移一次。 对于主动-主动方案,我们需要通过两种方式执行此操作。
groups=.* emit.checkpoints.enabled = true source->destination.sync.group.offsets.enabled = true source->destination.sync.group.offsets.interval.ms=30000 destination->source.sync.group.offsets.enabled = true destination->source.sync.group.offsets.interval.ms=30000
如果不希望跨群集复制内部主题,请使用以下属性
topics.blacklist="*.internal,__.*"
更改后的最终配置文件应如下所示
# specify any number of cluster aliases clusters = primary-kafka-cluster, secondary-kafka-cluster # connection information for each cluster # This is a comma separated host:port pairs for each cluster # for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari -> Hosts primary-kafka-cluster.bootstrap.servers = wn0-src-kafka.bx.internal.chinacloudapp.cn:9092,wn1-src-kafka.bx.internal.chinacloudapp.cn:9092,wn2-src-kafka.bx.internal.chinacloudapp.cn:9092 secondary-kafka-cluster.bootstrap.servers = wn0-dest-kafka.bx.internal.chinacloudapp.cn:9092,wn1-dest-kafka.bx.internal.chinacloudapp.cn:9092,wn2-dest-kafka.bx.internal.chinacloudapp.cn:9092 # enable and configure individual replication flows primary-kafka-cluster->secondary-kafka-cluster.enabled = true # enable this for both side replication secondary-kafka-cluster->primary-kafka-cluster.enabled = true # regex which defines which topics gets replicated. For eg "foo-.*" primary-kafka-cluster->secondary-kafka-cluster.topics = .* secondary-kafka-cluster->primary-kafka-cluster.topics = .* groups=.* emit.checkpoints.enabled = true primary-kafka-cluster->secondary-kafka-cluster.sync.group.offsets.enabled=true primary-kafka-cluster->secondary-kafka-cluster.sync.group.offsets.interval.ms=30000 secondary-kafka-cluster->primary-kafka-cluster.sync.group.offsets.enabled = true secondary-kafka-cluster->primary-kafka-cluster.sync.group.offsets.interval.ms=30000 topics.blacklist="*.internal,__.*" # Setting replication factor of newly created remote topics Replication.factor=3 checkpoints.topic.replication.factor=3 heartbeats.topic.replication.factor=3 offset-syncs.topic.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3 config.storage.replication.factor=3
在
SECONDARYCLUSTER
中启动 Mirror Maker2,它应该可以正常运行/usr/hdp/current/kafka-broker ./bin/connect-mirror-maker.sh ./config/connect-mirror-maker.properties
现在在
PRIMARYCLUSTER
中启动生成者export clusterName='primary-kafka-cluster' export TOPICNAME='TestMirrorMakerTopic' export KAFKABROKERS='wn0-primar:9092' export KAFKAZKHOSTS='zk0-primar:2181' //Start Producer # For Kafka 2.4 bash /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --zookeeper $KAFKAZKHOSTS --topic $TOPICNAME # For Kafka 3.2 bash /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --boostrap-server $KAFKABROKERS --topic $TOPICNAME
现在,通过使用者组启动 PRIMARYCLUSTER 中的使用者
//Start Consumer # For Kafka 2.4 bash /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper $KAFKAZKHOSTS --topic $TOPICNAME --group my-group --from- beginning # For Kafka 3.2 bash /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --boostrap-server $KAFKABROKERS --topic $TOPICNAME --group my-group --from-beginning
现在,停止 PRIMARYCONSUMER 中的使用者,然后使用同一使用者组启动 SECONDARYCLUSTER 中的使用者
export clusterName='secondary-kafka-cluster' export TOPICNAME='primary-kafka-cluster.TestMirrorMakerTopic' export KAFKABROKERS='wn0-second:9092' export KAFKAZKHOSTS='zk0-second:2181' # List all the topics whether they're replicated or not bash /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $KAFKAZKHOSTS --list # Start Consumer bash /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic $TOPICNAME --from-beginning
你会注意到,在辅助群集使用者组中,my-group 无法使用任何消息,因为消息已由主群集使用者组使用。 现在,可在主群集中生成更多消息,并尝试在辅助群集中使用它们。 可以从
SECONDARYCLUSTER
开始使用。
警告
HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。 请务必在使用完群集之后将其删除。 请参阅如何删除 HDInsight 群集。
本文中的步骤已在不同的 Azure 资源组中创建了群集。 若要删除创建的所有资源,可以删除创建的两个资源组:kafka-primary-rg 和 kafka-secondary_rg。 删除资源组会删除遵循本文创建的所有资源,包括群集、虚拟网络和存储帐户。
本文介绍了如何使用 MirrorMaker2 创建 Apache Kafka 群集的副本。
使用以下链接来发现与 Kafka 配合使用的其他方式: