使用 MirrorMaker 通过 Kafka on HDInsight 复制 Apache Kafka 主题

了解如何使用 Apache Kafka 镜像功能将主题复制到辅助群集。 你可以运行镜像作为一个连续的进程,或者间歇性地将数据从一个群集迁移到另一个群集。

在本文中,你将使用镜像在两个 HDInsight 群集之间复制主题。 这两个群集位于不同数据中心内的不同虚拟网络中。

警告

不要使用镜像作为实现容错的一种手段。 主题中项的偏移在主要群集与辅助群集之间有所不同,因此客户端不能换用这两种群集。 如果担心容错,应在群集内为主题设置复制。 有关详细信息,请参阅 Apache Kafka on HDInsight 入门

Apache Kafka 镜像的工作原理

镜像的工作方式是使用 MirrorMaker 工具,该工具属于 Apache Kafka。 MirrorMaker 使用来自主集群上的主题的记录,然后在辅助集群上创建一个本地副本。 MirrorMaker 使用一个或多个使用者从主要群集读取记录,使用生成者将记录写入本地(辅助)群集。

最有用的灾难恢复镜像设置使用不同 Azure 区域中的 Kafka 群集。 为实现此目的,群集所在的虚拟网络将对等互连到一起。

下图演示了镜像过程以及群集之间的通信流动方式:

Diagram of the mirroring process.

主要和辅助群集在节点与分区数目方面可以不同,主题中的偏移也可以不同。 镜像维护用于分区的密钥值,因此会按密钥来保留记录顺序。

跨网络边界执行镜像操作

如果需要在不同网络中的 Kafka 群集之间执行镜像操作,还需要考虑以下注意事项:

  • 网关:网络必须能够在 TCP/IP 级别通信。

  • 服务器寻址:可选择使用群集节点的 IP 地址或完全限定的域名来寻址这些节点。

    • IP 地址:如果将 Kafka 群集配置为使用 IP 地址播发,可使用代理节点和 Zookeeper 节点的 IP 地址继续进行镜像设置。

    • 域名称:如果未在 Kafka 群集上配置 IP 地址播发,则群集必须能够使用完全限定的域名 (FQDN) 相互连接。 这需要在每个网络中设置一台域名系统 (DNS) 服务器,并将其配置为向其他网络转发请求。 创建 Azure 虚拟网络时,必须指定自定义 DNS 服务器和服务器的 IP 地址,而不是使用网络提供的自动 DNS。 在创建虚拟网络之后,必须创建一个使用该 IP 地址的 Azure 虚拟机。 然后在上面安装和配置 DNS 软件。

    重要

    在将 HDInsight 安装到虚拟网络之前,需先创建和配置自定义 DNS 服务器。 HDInsight 不需要再进行其他配置,便可使用为虚拟网络配置的 DNS 服务器。

有关连接两个 Azure 虚拟网络的详细信息,请参阅配置连接

镜像体系结构

此体系结构在不同的资源组和虚拟网络中配置两个群集:主要群集和辅助群集。

创建步骤

  1. 创建两个新的资源组:

    资源组 位置
    kafka-primary-rg 中国北部
    kafka-secondary-rg 中国北部
  2. kafka-primary-rg 中创建新的虚拟网络 kafka-primary-vnet。 保留默认设置。

  3. kafka-secondary-rg 中创建新的虚拟网络 kafka-secondary-vnet,也保留默认设置。

  4. 创建两个新的 Kafka 群集:

    群集名称 资源组 虚拟网络 存储帐户
    kafka-primary-cluster kafka-primary-rg kafka-primary-vnet kafkaprimarystorage
    kafka-secondary-cluster kafka-secondary-rg kafka-secondary-vnet kafkasecondarystorage
  5. 创建虚拟网络对等互连。 此步骤创建两个对等互连:一个从 kafka-primary-vnet 连接到 kafka-secondary-vnet,一个从 kafka-secondary-vnet 连接回到 kafka-primary-vnet

    1. 选择“kafka-primary-vnet”虚拟网络。

    2. 在“设置”下选择“对等互连”。

    3. 选择“添加” 。

    4. 在“添加对等互连”屏幕上输入详细信息,如以下屏幕截图所示。

      Screenshot that shows H D Insight Kafka add virtual network peering.

配置 IP 播发

配置 IP 播发,使客户端可以使用中转站 IP 地址而不是域名进行连接。

  1. 转到主要群集的 Ambari 仪表板:https://PRIMARYCLUSTERNAME.azurehdinsight.cn

  2. 选择“服务”>“Kafka”。 选择“配置”选项卡。

  3. 将以下配置行添加到底部的 kafka-env template 节。 选择“保存”。

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  4. 在“保存配置”屏幕上输入备注,然后选择“保存”。

  5. 如果收到配置警告,请选择“仍然继续”。

  6. 在“保存配置更改”时,选择“确定”。

  7. 在“需要重新启动”通知中,选择“重新启动”>“重新启动所有受影响的组件”。 然后选择“确认全部重新启动”。

    Screenshot that shows the Apache Ambari option to restart all affected.

将 Kafka 配置为侦听所有网络接口

  1. 不要关闭“服务”>“Kafka”下的“配置”选项卡。 在“Kafka 代理”部分,将“侦听器”属性设置为 PLAINTEXT://0.0.0.0:9092
  2. 选择“保存”。
  3. 选择“重新启动”>“确认全部重新启动”。

记下主要群集的代理 IP 地址和 Zookeeper 地址

  1. 在 Ambari 仪表板上选择“主机”。

  2. 记下代理和 Zookeeper 的 IP 地址。 代理节点主机名的前两个字母为 wn,Zookeeper 节点主机名的前两个字母为 zk

    Screenshot that shows the Apache Ambari view node i p addresses.

  3. 对第二个群集 kafka-secondary-cluster 重复上述三个步骤:配置 IP 播发、设置侦听器并记下代理和 Zookeeper 的 IP 地址。

创建主题

  1. 使用 SSH 连接到主要群集:

    ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.cn
    

    sshuser 替换为创建群集时所用的 SSH 用户名。 将 PRIMARYCLUSTER 替换为创建群集时所用的基名称。

    有关详细信息,请参阅 将 SSH 与 HDInsight 配合使用

  2. 使用以下命令,为主要群集的 Apache Zookeeper 主机和 Broker 主机创建两个环境变量。 将类似 ZOOKEEPER_IP_ADDRESS1 的字符串替换为前面记下的实际 IP 地址,例如 10.23.0.1110.23.0.7。 对 BROKER_IP_ADDRESS1 也是如此。 如果对自定义 DNS 服务器使用 FQDN 解析,请遵循这些步骤获取代理和 zookeeper 名称。

    # get the ZooKeeper hosts for the primary cluster
    export PRIMARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181, ZOOKEEPER_IP_ADDRESS2:2181, ZOOKEEPER_IP_ADDRESS3:2181'
    
    # get the broker hosts for the primary cluster
    export PRIMARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    
  3. 若要创建名为 testtopic 的主题,请使用以下命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $PRIMARY_ZKHOSTS
    
  4. 使用以下命令验证是否已创建该主题:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $PRIMARY_ZKHOSTS
    

    响应包含 testtopic

  5. 使用以下命令查看此(主要)群集的 Broker 主机信息:

    echo $PRIMARY_BROKERHOSTS
    

    这会返回类似于以下文本的信息:

    10.23.0.11:9092,10.23.0.7:9092,10.23.0.9:9092

    请保存此信息。 在下一部分中使用。

配置镜像

  1. 使用不同的 SSH 会话连接到辅助群集:

    ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.cn
    

    sshuser 替换为创建群集时所用的 SSH 用户名。 将 SECONDARYCLUSTER 替换为创建群集时所用的名称。

    有关详细信息,请参阅 将 SSH 与 HDInsight 配合使用

  2. 使用一个文件来配置与主群集的通信。 若要创建文件,请使用以下命令:

    nano consumer.properties
    

    将以下文本用作 consumer.properties 文件的内容:

    bootstrap.servers=PRIMARY_BROKERHOSTS
    group.id=mirrorgroup
    

    PRIMARY_BROKERHOSTS 替换为主要群集的代理主机 IP 地址。

    此文件说明从 Kafka 主要群集读取记录时要使用的使用者信息。 有关详细信息,请参阅 kafka.apache.org 中的使用者配置

    若要保存文件,请按 Ctrl + X,按 Y,然后按 Enter。

  3. 在配置与辅助群集通信的生成者之前,请为辅助群集的代理 IP 地址设置一个变量。 使用以下命令创建此变量:

    export SECONDARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    

    命令 echo $SECONDARY_BROKERHOSTS 返回的信息应类似于以下文本:

    10.23.0.14:9092,10.23.0.4:9092,10.23.0.12:9092

  4. 使用 producer.properties 文件与辅助群集通信。 若要创建文件,请使用以下命令:

    nano producer.properties
    

    将以下文本用作 producer.properties 文件的内容:

    bootstrap.servers=SECONDARY_BROKERHOSTS
    compression.type=none
    

    SECONDARY_BROKERHOSTS 替换为在上一步骤中使用的代理 IP 地址。

    有关详细信息,请参阅 kafka.apache.org 中的生产者配置

  5. 使用以下命令创建一个环境变量,其中包含辅助群集的 Zookeeper 主机 IP 地址:

    # get the ZooKeeper hosts for the secondary cluster
    export SECONDARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181,ZOOKEEPER_IP_ADDRESS2:2181,ZOOKEEPER_IP_ADDRESS3:2181'
    
  6. Kafka 在 HDInsight 上的默认配置不允许自动创建主题。 在开始镜像过程之前,你必须使用以下选项之一:

    • 在辅助群集上创建主题:此选项还允许设置分区和复制因子的数目。

      可以使用以下命令提前创建新的主题:

      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SECONDARY_ZKHOSTS
      

      testtopic 替换为要创建的主题的名称。

    • 为自动创建主题配置群集:此选项允许 MirrorMaker 自动创建主题。 请注意,此选项在创建主题时采用分区数量或复制因子可能不同于主要主题。

      若要配置辅助群集以自动创建主题,请执行以下步骤:

      1. 转到辅助群集的 Ambari 仪表板:https://SECONDARYCLUSTERNAME.azurehdinsight.cn
      2. 选择“服务”>“Kafka”。 然后选择“配置”选项卡。
      3. 在“筛选器”字段中输入值 auto.create。 这将筛选的属性,并显示列表auto.create.topics.enable设置。
      4. auto.create.topics.enable 的值更改为 true,然后选择“保存”。 添加注释,然后选择保存
      5. 选择 Kafka 服务,选择重启,然后选择重启所有受影响的。 出现提示时,选择“确认全部重启”

      Screenshot that shows how to enable auto create topics in the kafka service.

启动 MirrorMaker

注意

本文包含一个对 Azure 不再使用的术语的引用。 在从软件中删除该术语后,我们会将其从本文中删除。

  1. 与辅助群集建立 SSH 连接后,使用以下命令启动 MirrorMaker 进程:

    /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
    

    此示例中使用的参数有:

    参数 说明
    --consumer.config 指定包含使用者属性的文件。 你可以使用这些属性创建一个从主 Kafka 群集中进行读取操作的使用者。
    --producer.config 指定包含创建者属性的文件。 你可以使用这些属性来创建一个生产者,生产者将写到第二个 Kafka 集群。
    --whitelist MirrorMaker 从主要群集复制到辅助群集的主题列表。
    --num.streams 要创建的使用者线程数。

    现在,辅助节点上的使用者正在等待接收消息。

  2. 与主要群集建立 SSH 连接后,使用以下命令启动生成者,并向主题发送消息:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $PRIMARY_BROKERHOSTS --topic testtopic
    

    出现带有光标的空行时,请键入几条文本消息。 这些消息将发送到主要群集上的主题。 完成后,按 Ctrl+C 结束生产者进程。

  3. 从 SSH 连接到辅助集群,按 Ctrl+C 结束 MirrorMaker 进程。 它可能需要几秒钟时间结束进程。 若要验证是否已将主题和消息复制到辅助群集,请使用以下命令:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $SECONDARY_BROKERHOSTS --topic testtopic --from-beginning
    

    主题列表现在包含 testtopic,该条目是在 MirrorMaster 将主题从主要群集镜像到辅助群集时创建的。 从主题检索到的消息与在主要群集上输入的消息相同。

删除群集

警告

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。 请务必在使用完群集之后将其删除。 请参阅如何删除 HDInsight 群集

本文中的步骤已在不同的 Azure 资源组中创建了群集。 若要删除创建的所有资源,可以删除创建的两个资源组:kafka-primary-rgkafka-secondary_rg。 删除资源组会删除遵循本文创建的所有资源,包括群集、虚拟网络和存储帐户。

后续步骤

本文已介绍如何使用 MirrorMaker 创建 Apache Kafka 群集的副本。 使用以下链接来发现与 Kafka 配合使用的其他方式: