使用 MirrorMaker 通过 Kafka on HDInsight 复制 Apache Kafka 主题Use MirrorMaker to replicate Apache Kafka topics with Kafka on HDInsight

了解如何使用 Apache Kafka 镜像功能将主题复制到辅助群集。Learn how to use Apache Kafka's mirroring feature to replicate topics to a secondary cluster. 镜像可以作为连续的进程运行,或者间接用作将数据从一个群集复制到另一个群集的方法。Mirroring can be ran as a continuous process, or used intermittently as a method of migrating data from one cluster to another.

在此示例中,镜像用于在两个 HDInsight 群集之间复制主题。In this example, mirroring is used to replicate topics between two HDInsight clusters. 这两个群集位于不同数据中心内的不同虚拟网络中。Both clusters are in different virtual networks in different datacenters.

警告

不应将镜像视为一种实现容错的方式。Mirroring should not be considered as a means to achieve fault-tolerance. 主题中项的偏移在主要群集与辅助群集之间有所不同,因此客户端不能换用这两种群集。The offset to items within a topic are different between the primary and secondary clusters, so clients cannot use the two interchangeably.

如果关心容错能力,应该为群集中的主题设置复制。If you are concerned about fault tolerance, you should set replication for the topics within your cluster. 有关详细信息,请参阅 Apache Kafka on HDInsight 入门For more information, see Get started with Apache Kafka on HDInsight.

Apache Kafka 镜像的工作原理How Apache Kafka mirroring works

镜像通过使用 MirrorMaker 工具(Apache Kafka 的一部分)来使用主要群集上主题中的记录,然后在辅助群集上创建本地副本。Mirroring works by using the MirrorMaker tool (part of Apache Kafka) to consume records from topics on the primary cluster and then create a local copy on the secondary cluster. MirrorMaker 使用一个或多个使用者从主要群集读取记录,使用生成者将记录写入本地(辅助)群集。 MirrorMaker uses one (or more) consumers that read from the primary cluster, and a producer that writes to the local (secondary) cluster.

最有用的灾难恢复镜像设置利用不同 Azure 区域中的 Kafka 群集。The most useful mirroring setup for disaster recovery utilizes Kafka clusters in different Azure regions. 为实现此目的,群集所在的虚拟网络将对等互连到一起。To achieve this, the virtual networks where the clusters reside are peered together.

下图演示了镜像过程以及群集之间的通信流动方式:The following diagram illustrates the mirroring process and how the communication flows between clusters:

镜像过程示意图

主要和辅助群集在节点与分区数目方面可以不同,主题中的偏移也可以不同。The primary and secondary clusters can be different in the number of nodes and partitions, and offsets within the topics are different also. 镜像将维护用于分区的键值,因此,会根据键保留记录顺序。Mirroring maintains the key value that is used for partitioning, so record order is preserved on a per-key basis.

跨网络边界执行镜像操作Mirroring across network boundaries

如果需要在不同网络中的 Kafka 群集之间镜像,请注意以下附加事项:If you need to mirror between Kafka clusters in different networks, there are the following additional considerations:

  • 网关:网络必须能够在 TCP/IP 级别通信。Gateways: The networks must be able to communicate at the TCP/IP level.

  • 服务器寻址:可以选择使用群集节点的 IP 地址或完全限定域名来寻址这些节点。Server addressing: You can choose to address your cluster nodes using their IP addresses or fully qualified domain names.

    • IP 地址:如果将 Kafka 群集配置为使用 IP 地址播发,则可以使用代理节点和 Zookeeper 节点的 IP 地址继续进行镜像设置。IP addresses: If you configure your Kafka clusters to use IP address advertising, you can proceed with the mirroring setup using the IP addresses of the broker nodes and zookeeper nodes.

    • 域名:如果未在 Kafka 群集上配置 IP 地址播发,则群集必须能够使用完全限定的域名 (FQDN) 相互连接。Domain names: If you don't configure your Kafka clusters for IP address advertising, the clusters must be able to connect to each other by using Fully Qualified Domain Names (FQDNs). 这需要在每个网络中设置一台域名系统 (DNS) 服务器,并将其配置为向其他网络转发请求。This requires a Domain Name System (DNS) server in each network that is configured to forward requests to the other networks. 创建 Azure 虚拟网络时,请不要使用网络提供的自动 DNS,必须指定一台自定义 DNS 服务器以及该服务器的 IP 地址。When creating an Azure Virtual Network, instead of using the automatic DNS provided with the network, you must specify a custom DNS server and the IP address for the server. 创建虚拟网络后,必须创建使用该 IP 地址的 Azure 虚拟机,并在该虚拟机上安装并配置 DNS 软件。After the Virtual Network has been created, you must then create an Azure Virtual Machine that uses that IP address, then install and configure DNS software on it.

    警告

    请先创建并配置自定义 DNS 服务器,此后再将 HDInsight 安装到虚拟网络中。Create and configure the custom DNS server before installing HDInsight into the Virtual Network. 无需对 HDInsight 进行其他配置即可使用针对虚拟网络配置的 DNS 服务器。There is no additional configuration required for HDInsight to use the DNS server configured for the Virtual Network.

有关连接两个 Azure 虚拟网络的详细信息,请参阅配置 VNet 到 VNet 的连接For more information on connecting two Azure Virtual Networks, see Configure a VNet-to-VNet connection.

镜像体系结构Mirroring architecture

此体系结构在不同的资源组和虚拟网络中配置两个群集:主要群集辅助群集This architecture features two clusters in different resource groups and virtual networks: a primary and secondary.

创建步骤Creation steps

  1. 创建两个新的资源组:Create two new resource groups:

    资源组Resource Group 位置Location
    kafka-primary-rgkafka-primary-rg 中国东部China East
    kafka-secondary-rgkafka-secondary-rg 中国北部China North
  2. kafka-primary-rg 中创建新的虚拟网络 kafka-primary-vnetCreate a new virtual network kafka-primary-vnet in kafka-primary-rg. 保留默认设置。Leave the default settings.

  3. kafka-secondary-rg 中创建新的虚拟网络 kafka-secondary-vnet,也保留默认设置。Create a new virtual network kafka-secondary-vnet in kafka-secondary-rg, also with default settings.

  4. 创建两个新的 Kafka 群集:Create two new Kafka clusters:

    群集名称Cluster name 资源组Resource Group 虚拟网络Virtual Network 存储帐户Storage Account
    kafka-primary-clusterkafka-primary-cluster kafka-primary-rgkafka-primary-rg kafka-primary-vnetkafka-primary-vnet kafkaprimarystoragekafkaprimarystorage
    kafka-secondary-clusterkafka-secondary-cluster kafka-secondary-rgkafka-secondary-rg kafka-secondary-vnetkafka-secondary-vnet kafkasecondarystoragekafkasecondarystorage
  5. 创建虚拟网络对等互连。Create virtual network peerings. 此步骤创建两个对等互连:一个从 kafka-primary-vnet 连接到 kafka-secondary-vnet,一个从 kafka-secondary-vnet 连接回到 kafka-primary-vnetThis step will create two peerings: one from kafka-primary-vnet to kafka-secondary-vnet and one back from kafka-secondary-vnet to kafka-primary-vnet.

    1. 选择“kafka-primary-vnet”虚拟网络。 Select the kafka-primary-vnet virtual network.

    2. 在“设置” 下,选择“对等互连” 。Select Peerings under Settings.

    3. 选择“添加” 。Select Add.

    4. 在“添加对等互连”屏幕上输入详细信息,如以下屏幕截图所示。 On the Add peering screen, enter the details as shown in the screenshot below.

      添加 VNet 对等互连

配置 IP 播发Configure IP advertising

配置 IP 播发,使客户端可以使用中转站 IP 地址而不是域名进行连接。Configure IP advertising to enable a client to connect using broker IP addresses instead of domain names.

  1. 转到主要群集的 Ambari 仪表板:https://PRIMARYCLUSTERNAME.azurehdinsight.cnGo to the Ambari dashboard for the primary cluster: https://PRIMARYCLUSTERNAME.azurehdinsight.cn.

  2. 选择“服务” > “Kafka”。 Select Services > Kafka. 单击“配置”选项卡 。CliSelectck the Configs tab.

  3. 将以下配置行添加到底部的 kafka-env template 节。Add the following config lines to the bottom kafka-env template section. 选择“保存” 。Select Save.

     ```
     # Configure Kafka to advertise IP addresses instead of FQDN
     IP_ADDRESS=$(hostname -i)
     echo advertised.listeners=$IP_ADDRESS
     sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
     echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
     ```
    
    1. 在“保存配置”屏幕上输入备注,然后单击“保存”。 Enter a note on the Save Configuration screen and click Save.

    2. 如果出现配置警告提示,请单击“仍然继续”。 If you're prompted with configuration warning, click Proceed Anyway.

    3. 在“保存配置更改”屏幕上选择“确定”。 Select Ok on the Save Configuration Changes.

    4. 在“需要重启”通知中,选择“重启” > “重启所有受影响的服务”。 Select Restart > Restart All Affected in the Restart Required notification. 选择“确认全部重启” 。Select Confirm Restart All.

      重启 Kafka 节点

将 Kafka 配置为侦听所有网络接口。Configure Kafka to listen on all network interfaces.

1. <span data-ttu-id="bb539-180">不要关闭“服务” > “Kafka”下的“配置”选项卡。   </span><span class="sxs-lookup"><span data-stu-id="bb539-180">Stay on the **Configs** tab under **Services** > **Kafka**.</span></span> <span data-ttu-id="bb539-181">在“Kafka 代理”部分,将“侦听器”属性设置为 `PLAINTEXT://0.0.0.0:9092`。  </span><span class="sxs-lookup"><span data-stu-id="bb539-181">In the **Kafka Broker** section set the **listeners** property to `PLAINTEXT://0.0.0.0:9092`.</span></span>
1. <span data-ttu-id="bb539-182">选择“保存”  。</span><span class="sxs-lookup"><span data-stu-id="bb539-182">Select **Save**.</span></span>
1. <span data-ttu-id="bb539-183">依次选择“重启”、“确认全部重启”。  </span><span class="sxs-lookup"><span data-stu-id="bb539-183">Select **Restart**, and **Confirm Restart All**.</span></span>

记下主要群集的代理 IP 地址和 Zookeeper 地址。Record Broker IP addresses and Zookeeper addresses for primary cluster.

1. <span data-ttu-id="bb539-185">在 Ambari 仪表板上选择“主机”。 </span><span class="sxs-lookup"><span data-stu-id="bb539-185">Select **Hosts** on the Ambari dashboard.</span></span>
1. <span data-ttu-id="bb539-186">记下代理和 Zookeeper 的 IP 地址。</span><span class="sxs-lookup"><span data-stu-id="bb539-186">Make a note of the IP Addresses for the Brokers and Zookeepers.</span></span> <span data-ttu-id="bb539-187">代理节点主机名的前两个字母为 **wn**,Zookeeper 节点主机名的前两个字母为 **zk**。</span><span class="sxs-lookup"><span data-stu-id="bb539-187">The broker nodes have **wn** as the first two letters of the host name, and the zookeeper nodes have **zk** as the first two letters of the host name.</span></span>

    ![查看 IP 地址](./media/apache-kafka-mirroring/view-node-ip-addresses2.png)
  1. 对第二个群集 kafka-secondary-cluster 重复上述三个步骤:配置 IP 播发、设置侦听器并记下代理和 Zookeeper 的 IP 地址。Repeat the previous three steps for the second cluster kafka-secondary-cluster: configure IP advertising, set listeners and make a note of the Broker and Zookeeper IP addresses.

创建主题Create topics

  1. 使用 SSH 连接到主要群集:Connect to the primary cluster using SSH:

    ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.cn
    

    sshuser 替换为创建群集时使用的 SSH 用户名。Replace sshuser with the SSH user name used when creating the cluster. PRIMARYCLUSTER 替换为创建群集时使用的基名称。Replace PRIMARYCLUSTER with the base name used when creating the cluster.

    有关信息,请参阅将 SSH 与 HDInsight 配合使用For information, see Use SSH with HDInsight.

  2. 使用以下命令创建一个变量,其中包含主要群集的 Apache Zookeeper 主机。Use the following command to create a variable with the Apache Zookeeper hosts for the primary cluster. 必须将类似于 ZOOKEEPER_IP_ADDRESS1 的字符串替换为前面记下的实际 IP 地址,例如 10.23.0.1110.23.0.7The strings like ZOOKEEPER_IP_ADDRESS1 must be replaced with the actual IP addresses recorded earlier, such as 10.23.0.11 and 10.23.0.7. 如果对自定义 DNS 服务器使用 FQDN 解析,请遵循这些步骤获取代理 和 Zookeeper 名称:If you are using FQDN resolution with a custom DNS server, follow these steps to get broker and zookeeper names.:

    # get the zookeeper hosts for the primary cluster
    export PRIMARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181, ZOOKEEPER_IP_ADDRESS2:2181, ZOOKEEPER_IP_ADDRESS3:2181'
    
  3. 若要创建名为 testtopic 的主题,请使用以下命令:To create a topic named testtopic, use the following command:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $PRIMARY_ZKHOSTS
    
  4. 使用以下命令验证是否已创建主题:Use the following command to verify that the topic was created:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $PRIMARY_ZKHOSTS
    

    响应包含 testtopicThe response contains testtopic.

  5. 使用以下命令查看此(主要)群集的 Zookeeper 主机信息:Use the following to view the Zookeeper host information for this (the primary) cluster:

    echo $PRIMARY_ZKHOSTS
    

    此命令返回类似于以下文本的信息:This returns information similar to the following text:

    10.23.0.11:2181,10.23.0.7:2181,10.23.0.9:2181

    请保存此信息。Save this information. 下一部分会用到它。It is used in the next section.

配置镜像Configure mirroring

  1. 使用不同的 SSH 会话连接到辅助群集:Connect to the secondary cluster using a different SSH session:

    ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.cn
    

    sshuser 替换为创建群集时使用的 SSH 用户名。Replace sshuser with the SSH user name used when creating the cluster. SECONDARYCLUSTER 替换为创建群集时使用的名称。Replace SECONDARYCLUSTER with the name used when creating the cluster.

    有关信息,请参阅将 SSH 与 HDInsight 配合使用For information, see Use SSH with HDInsight.

  2. consumer.properties 文件用于配置与主要群集的通信。A consumer.properties file is used to configure communication with the primary cluster. 若要创建文件,请使用以下命令:To create the file, use the following command:

    nano consumer.properties
    

    使用以下文本作为 consumer.properties 文件的内容:Use the following text as the contents of the consumer.properties file:

    zookeeper.connect=PRIMARY_ZKHOSTS
    group.id=mirrorgroup
    

    PRIMARY_ZKHOSTS 替换为主要群集中的 Zookeeper IP 地址。Replace PRIMARY_ZKHOSTS with the Zookeeper IP Addresses from the primary cluster.

    此文件说明从 Kafka 主要群集读取记录时要使用的使用者信息。This file describes the consumer information to use when reading from the primary Kafka cluster. 有关使用者配置的详细信息,请参阅 kafka.apache.org 上的 Consumer Configs (使用者配置)。For more information consumer configuration, see Consumer Configs at kafka.apache.org.

    若要保存文件,请使用 Ctrl+X、Y,然后按 Enter。 To save the file, use Ctrl + X, Y, and then Enter.

  3. 在配置与辅助群集通信的生成者之前,请为辅助群集的代理 IP 地址设置一个变量。Before configuring the producer that communicates with the secondary cluster, setup a variable for the broker IP addresses of the secondary cluster. 使用以下命令创建此变量:Use the following commands to create this variable:

    export SECONDARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    

    命令 echo $SECONDARY_BROKERHOSTS 返回的信息应类似于以下文本:The command echo $SECONDARY_BROKERHOSTS should return information similar to the following text:

    10.23.0.14:9092,10.23.0.4:9092,10.23.0.12:9092

  4. producer.properties 文件用于与辅助群集通信。A producer.properties file is used to communicate the secondary cluster. 若要创建文件,请使用以下命令:To create the file, use the following command:

    nano producer.properties
    

    使用以下文本作为 producer.properties 文件的内容:Use the following text as the contents of the producer.properties file:

    bootstrap.servers=SECONDARY_BROKERHOSTS
    compression.type=none
    

    SECONDARY_BROKERHOSTS 替换为在上一步骤中使用的代理 IP 地址。Replace SECONDARY_BROKERHOSTS with the broker IP addresses used in the previous step.

    有关生成者配置的详细信息,请参阅 kafka.apache.org 上的 Producer Configs (生成者配置)。For more information producer configuration, see Producer Configs at kafka.apache.org.

  5. 使用以下命令创建一个环境变量,其中包含辅助群集的 Zookeeper 主机 IP 地址:Use the following commands to create an environment variable with the IP addresses of the Zookeeper hosts for the secondary cluster:

    # get the zookeeper hosts for the secondary cluster
    export SECONDARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181,ZOOKEEPER_IP_ADDRESS2:2181,ZOOKEEPER_IP_ADDRESS3:2181'
    
  6. Kafka 在 HDInsight 上的默认配置不允许自动创建的主题。The default configuration for Kafka on HDInsight does not allow the automatic creation of topics. 在开始镜像过程之前,你必须使用以下选项之一:You must use one of the following options before starting the Mirroring process:

    • 在辅助群集上创建主题:此选项还允许设置分区和复制因子的数目。Create the topics on the secondary cluster: This option also allows you to set the number of partitions and the replication factor.

      可以使用以下命令提前创建新的主题:You can create topics ahead of time by using the following command:

      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SECONDARY_ZKHOSTS
      

      testtopic 替换为要创建的主题的名称。Replace testtopic with the name of the topic to create.

    • 将群集配置为自动主题创建:此选项允许 MirrorMaker 自动创建主题,但创建这些主题时使用的分区数或复制因子可能不同于主要主题。Configure the cluster for automatic topic creation: This option allows MirrorMaker to automatically create topics, however it may create them with a different number of partitions or replication factor than the primary topic.

      若要配置辅助群集以自动创建主题,请执行以下步骤:To configure the secondary cluster to automatically create topics, perform these steps:

      1. 转到辅助群集的 Ambari 仪表板:https://SECONDARYCLUSTERNAME.azurehdinsight.cnGo to the Ambari dashboard for the secondary cluster: https://SECONDARYCLUSTERNAME.azurehdinsight.cn.
      2. 单击“服务” > “Kafka”。 Click Services > Kafka. 单击“配置”选项卡 。Click the Configs tab.
      3. 在“筛选器” 字段中输入值 auto.createIn the Filter field, enter a value of auto.create. 这将筛选的属性,并显示列表auto.create.topics.enable设置。This filters the list of properties and displays the auto.create.topics.enable setting.
      4. 更改的值auto.create.topics.enable为 true,然后选择__保存__。Change the value of auto.create.topics.enable to true, and then select Save. 添加注释,然后选择__保存__。Add a note, and then select Save again.
      5. 选择 Kafka 服务,选择__重启__,然后选择__重启所有受影响的__。Select the Kafka service, select Restart, and then select Restart all affected. 出现提示时,选择“确认全部重启” 。When prompted, select Confirm restart all.

      配置主题自动创建

启动 MirrorMakerStart MirrorMaker

  1. 辅助群集建立 SSH 连接后,使用以下命令启动 MirrorMaker 进程:From the SSH connection to the secondary cluster, use the following command to start the MirrorMaker process:

    /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
    

    本示例中使用的参数为:The parameters used in this example are:

    参数Parameter 说明Description
    --consumer.config--consumer.config 指定包含使用者属性的文件。Specifies the file that contains consumer properties. 这些属性用于创建可从主要 Kafka 群集读取记录的使用者。These properties are used to create a consumer that reads from the primary Kafka cluster.
    --producer.config--producer.config 指定包含生成者属性的文件。Specifies the file that contains producer properties. 这些属性用于创建可向辅助 Kafka 群集写入记录的生成者。These properties are used to create a producer that writes to the secondary Kafka cluster.
    --whitelist--whitelist MirrorMaker 从主要群集复制到辅助群集的主题列表。A list of topics that MirrorMaker replicates from the primary cluster to the secondary.
    --num.streams--num.streams 要创建的使用者线程数。The number of consumer threads to create.

    现在,辅助节点上的使用者正在等待接收消息。The consumer on the secondary node is now waiting to receive messages.

  2. 主要群集建立 SSH 连接后,使用以下命令启动生成者,并向主题发送消息:From the SSH connection to the primary cluster, use the following command to start a producer and send messages to the topic:

    export PRIMARY_BROKERHOSTS=BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092
    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $SOURCE_BROKERHOSTS --topic testtopic
    

    出现带有光标的空行时,请键入几条文本消息。When you arrive at a blank line with a cursor, type in a few text messages. 这些消息将发送到主要群集上的主题。The messages are sent to the topic on the primary cluster. 完成后,按 Ctrl + C 结束生成者进程。When done, use Ctrl + C to end the producer process.

  3. 辅助群集建立 SSH 连接后,使用 Ctrl + C 结束 MirrorMaker 进程。From the SSH connection to the secondary cluster, use Ctrl + C to end the MirrorMaker process. 它可能需要几秒钟时间结束进程。It may take several seconds to end the process. 若要验证是否已将主题和消息复制到辅助群集,请使用以下命令:To verify that the messages were replicated to the secondary, use the following command:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $SECONDARY_ZKHOSTS --topic testtopic --from-beginning
    

    主题列表现在包含 testtopic,该条目是在 MirrorMaster 将主题从主要群集镜像到辅助群集时创建的。The list of topics now includes testtopic, which is created when MirrorMaster mirrors the topic from the primary cluster to the secondary. 从主题检索到的消息与在主要群集上输入的消息相同。The messages retrieved from the topic are the same as the ones you entered on the primary cluster.

删除群集Delete the cluster

警告

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。Billing for HDInsight clusters is prorated per minute, whether you use them or not. 请务必在使用完群集之后将其删除。Be sure to delete your cluster after you finish using it. 请参阅如何删除 HDInsight 群集See how to delete an HDInsight cluster.

本文档中的步骤已在不同的 Azure 资源组中创建了群集。The steps in this document created clusters in different Azure resource groups. 若要删除创建的所有资源,可以删除创建的两个资源组:kafka-primary-rgkafka-secondary_rgTo delete all of the resources created, you can delete the two resource groups created: kafka-primary-rg and kafka-secondary_rg. 删除资源组会删除遵循本文档创建的所有资源,包括群集、虚拟网络和存储帐户。Deleting the resource groups removes all of the resources created by following this document, including clusters, virtual networks, and storage accounts.

后续步骤Next Steps

本文档已介绍如何使用 MirrorMaker 创建 Apache Kafka 群集的副本。In this document, you learned how to use MirrorMaker to create a replica of an Apache Kafka cluster. 请使用以下链接探索 Kafka 的其他用法:Use the following links to discover other ways to work with Kafka: