快速入门:使用 Azure 门户在 Azure HDInsight 中创建 Apache Kafka 群集Quickstart: Create Apache Kafka cluster in Azure HDInsight using Azure portal

Apache Kafka 是开源分布式流式处理平台。Apache Kafka is an open-source, distributed streaming platform. 通常用作消息代理,因为它可提供类似于发布-订阅消息队列的功能。It's often used as a message broker, as it provides functionality similar to a publish-subscribe message queue.

本快速入门介绍了如何使用 Azure 门户创建 Apache Kafka 群集。In this quickstart, you learn how to create an Apache Kafka cluster using the Azure portal. 还介绍了如何使用已包含的实用程序发送并接收使用 Apache Kafka 的信息。You also learn how to use included utilities to send and receive messages using Apache Kafka. 有关可用配置的详细说明,请参阅在 HDInsight 中设置群集For in depth explanations of available configurations, see Set up clusters in HDInsight. 有关使用门户创建群集的其他信息,请参阅在门户中创建群集For additional information regarding the use of the portal to create clusters, see Create clusters in the portal.

警告

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。Billing for HDInsight clusters is prorated per minute, whether you use them or not. 请务必在使用完群集之后将其删除。Be sure to delete your cluster after you finish using it. 请参阅如何删除 HDInsight 群集See how to delete an HDInsight cluster.

仅可通过相同虚拟网络内的资源访问 Apache Kafka API。The Apache Kafka API can only be accessed by resources inside the same virtual network. 本快速入门使用 SSH 直接访问群集。In this Quickstart, you access the cluster directly using SSH. 若要将其他服务、网络或虚拟机连接到 Apache Kafka,则必须首先创建虚拟机,然后才能在网络中创建资源。To connect other services, networks, or virtual machines to Apache Kafka, you must first create a virtual network and then create the resources within the network. 有关详细信息,请参阅使用虚拟网络连接到 Apache Kafka 文档。For more information, see the Connect to Apache Kafka using a virtual network document. 有关为 HDInsight 规划虚拟网络的更多常规信息,请参阅为 Azure HDInsight 规划虚拟网络For more general information on planning virtual networks for HDInsight, see Plan a virtual network for Azure HDInsight.

如果没有 Azure 订阅,可在开始前创建一个试用帐户If you don't have an Azure subscription, create a trial account before you begin.

先决条件Prerequisites

SSH 客户端。An SSH client. 有关详细信息,请参阅使用 SSH 连接到 HDInsight (Apache Hadoop)For more information, see Connect to HDInsight (Apache Hadoop) using SSH.

创建 Apache Kafka 群集Create an Apache Kafka cluster

若要创建基于 HDInsight 的 Apache Kafka 群集,请使用以下步骤:To create an Apache Kafka cluster on HDInsight, use the following steps:

  1. 登录到 Azure 门户Sign in to the Azure portal.

  2. 在顶部菜单中,选择“+ 创建资源”。From the top menu, select + Create a resource.

    Azure 门户创建资源 HDInsight

  3. 选择“分析” > “Azure HDInsight”,转到“创建 HDInsight 群集”页。Select Analytics > Azure HDInsight to go to the Create HDInsight cluster page.

  4. 在“基本信息”选项卡中提供以下信息:From the Basics tab, provide the following information:

    属性Property 说明Description
    订阅Subscription 从下拉列表中选择用于此群集的 Azure 订阅。From the drop-down list, select the Azure subscription that's used for the cluster.
    资源组Resource group 创建资源组,或选择现有资源组。Create a resource group or select an existing resource group. 资源组是 Azure 组件的容器。A resource group is a container of Azure components. 在此示例中,资源组包含 HDInsight 群集和依赖的 Azure 存储帐户。In this case, the resource group contains the HDInsight cluster and the dependent Azure Storage account.
    群集名称Cluster name 输入任何全局唯一的名称。Enter a globally unique name. 该名称最多可以有 59 个字符,包括字母、数字和连字符。The name can consist of up to 59 characters including letters, numbers, and hyphens. 名称的第一个和最后一个字符不能为连字符。The first and last characters of the name cannot be hyphens.
    区域Region 从下拉列表中,选择在其中创建群集的区域。From the drop-down list, select a region where the cluster is created. 选择的区域与你越靠近,性能就越好。Choose a region closer to you for better performance.
    群集类型Cluster type 选择“选择群集类型”,打开一个列表。Select Select cluster type to open a list. 从列表中选择“Kafka”作为群集类型。From the list, select Kafka as the cluster type.
    版本Version 将指定群集类型的默认版本。The default version for the cluster type will be specified. 若要指定不同的版本,请从下拉列表中选择。Select from the drop-down list if you wish to specify a different version.
    群集登录用户名和密码Cluster login username and password 默认登录名为“admin”。密码长度不得少于 10 个字符,且至少必须包含一个数字、一个大写字母和一个小写字母、一个非字母数字字符(' " ` )字符除外)。The default login name is admin. The password must be at least 10 characters in length and must contain at least one digit, one uppercase, and one lower case letter, one non-alphanumeric character (except characters ' " ` ). 请确保不提供常见密码,如“Pass@word1”。Make sure you do not provide common passwords such as "Pass@word1".
    安全外壳 (SSH) 用户名Secure Shell (SSH) username 默认用户名为“sshuser”。The default username is sshuser. 可以提供其他名称作为 SSH 用户名。You can provide another name for the SSH username.
    对 SSH 使用群集登录密码Use cluster login password for SSH 选中此复选框,让 SSH 用户使用与提供给群集登录用户的密码相同的密码。Select this check box to use the same password for SSH user as the one you provided for the cluster login user.

    Azure 门户创建群集基本信息

    每个 Azure 区域(位置)均提供_容错域_。Each Azure region (location) provides fault domains. 容错域是 Azure 数据中心基础硬件的逻辑分组。A fault domain is a logical grouping of underlying hardware in an Azure data center. 每个容错域共享公用电源和网络交换机。Each fault domain shares a common power source and network switch. 在 HDInsight 群集中实现节点的虚拟机和托管磁盘跨这些容错域分布。The virtual machines and managed disks that implement the nodes within an HDInsight cluster are distributed across these fault domains. 此体系结构可限制物理硬件故障造成的潜在影响。This architecture limits the potential impact of physical hardware failures.

    为实现数据的高可用性,请选择包含三个容错域的区域(位置)。For high availability of data, select a region (location) that contains three fault domains. 有关区域中容错域数的信息,请参阅 Linux 虚拟机的可用性文档。For information on the number of fault domains in a region, see the Availability of Linux virtual machines document.

    选择页面底部的“下一步:存储 >>”选项卡转到存储设置。Select the Next: Storage >> tab to advance to the storage settings.

  5. 在“存储”选项卡中,提供以下值:From the Storage tab, provide the following values:

    属性Property 说明Description
    主存储类型Primary storage type 使用默认值“Azure 存储”。Use the default value Azure Storage.
    选择方法Selection method 使用默认值“从列表中选择”。Use the default value Select from list.
    主存储帐户Primary storage account 使用下拉列表选择现有存储帐户,或选择“新建”。Use the drop-down list to select an existing storage account, or select Create new. 如果创建新帐户,名称的长度必须在 3 到 24 个字符之间,并且只能包含数字和小写字母If you create a new account, the name must be between 3 and 24 characters in length, and can include numbers and lowercase letters only
    容器Container 使用自动填充的值。Use the autopopulated value.

    HDInsight Linux 入门之提供群集存储值HDInsight Linux get started provide cluster storage values

    选择“安全性 + 网络”选项卡。Select the Security + networking tab.

  6. 对于本快速入门,请保留默认的安全设置。For this quickstart, leave the default security settings.

    若要将群集连接到虚拟网络,请从“虚拟网络”下拉列表中选择一个虚拟网络。 If you would like to connect your cluster to a virtual network, select a virtual network from the Virtual network dropdown.

    将群集添加到虚拟网络

    选择“配置 + 定价”选项卡。Select the Configuration + pricing tab.

  7. 若要保证 Apache Kafka on HDInsight 的可用性,“工作器节点”的“节点数”条目必须设置为 3 或以上。To guarantee availability of Apache Kafka on HDInsight, the number of nodes entry for Worker node must be set to 3 or greater. 默认值为 4。The default value is 4.

    “每个工作器节点的标准磁盘数”条目配置 Apache Kafka on HDInsight 的可伸缩性。The Standard disks per worker node entry configures the scalability of Apache Kafka on HDInsight. Apache Kafka on HDInsight 在群集中使用虚拟机的本地磁盘来存储数据。Apache Kafka on HDInsight uses the local disk of the virtual machines in the cluster to store data. 由于 Apache Kafka 的 I/O 很高,因此会使用 Azure 托管磁盘为每个节点提供高吞吐量和更多存储。Apache Kafka is I/O heavy, so Azure Managed Disks are used to provide high throughput and more storage per node. 托管磁盘的类型可以为“标准”(HDD) 或“高级”(SSD)。The type of managed disk can be either Standard (HDD) or Premium (SSD). 磁盘类型取决于辅助角色节点(Apache Kafka 代理)所使用的 VM 大小。The type of disk depends on the VM size used by the worker nodes (Apache Kafka brokers). 高级磁盘可自动与 DS 和 GS 系列 VM 一起使用。Premium disks are used automatically with DS and GS series VMs. 所有其他的 VM 类型使用“标准”。All other VM types use standard.

    设置 Apache Kafka 群集大小

    选择“查看 + 创建”选项卡。Select the Review + create tab.

  8. 查看群集的配置。Review the configuration for the cluster. 更改所有不正确的设置。Change any settings that are incorrect. 最后,选择“创建”以创建群集。Finally, select Create to create the cluster.

    kafka 群集配置摘要

    创建群集可能需要 20 分钟。It can take up to 20 minutes to create the cluster.

连接到群集Connect to the cluster

  1. 使用 ssh 命令连接到群集。Use ssh command to connect to your cluster. 编辑以下命令,将 CLUSTERNAME 替换为群集的名称,然后输入该命令:Edit the command below by replacing CLUSTERNAME with the name of your cluster, and then enter the command:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
    
  2. 出现提示时,请输入 SSH 用户名密码。When prompted, enter the password for the SSH user.

    连接后,显示的信息类似于以下文本:Once connected, you see information similar to the following text:

    Authorized uses only. All activity may be monitored and reported.
    Welcome to Ubuntu 16.04.4 LTS (GNU/Linux 4.13.0-1011-azure x86_64)
    
     * Documentation:  https://help.ubuntu.com
     * Management:     https://landscape.canonical.com
     * Support:        https://ubuntu.com/advantage
    
      Get cloud support with Ubuntu Advantage Cloud Guest:
        https://www.ubuntu.com/business/services/cloud
    
    83 packages can be updated.
    37 updates are security updates.
    
    
    Welcome to Apache Kafka on HDInsight.
    
    Last login: Thu Mar 29 13:25:27 2018 from 108.252.109.241
    

获取 Apache Zookeeper 主机和代理主机信息Get the Apache Zookeeper and Broker host information

使用 Kafka 时,必须了解 Apache Zookeeper 和代理主机 。When working with Kafka, you must know the Apache Zookeeper and Broker hosts. 这些主机配合 Apache Kafka API 和 Kafka 随附的许多实用程序一起使用。These hosts are used with the Apache Kafka API and many of the utilities that ship with Kafka.

在本部分中,可以从群集上的 Apache Ambari REST API 获取主机信息。In this section, you get the host information from the Apache Ambari REST API on the cluster.

  1. 安装 jq,一个命令行 JSON 处理程序。Install jq, a command-line JSON processor. 此实用程序用于分析 JSON 文档和主机信息。This utility is used to parse JSON documents, and is useful in parsing the host information. 在打开的 SSH 连接中,输入以下命令以安装 jqFrom the open SSH connection, enter following command to install jq:

    sudo apt -y install jq
    
  2. 设置密码变量。Set up password variable. PASSWORD 替换为群集登录密码,然后输入以下命令:Replace PASSWORD with the cluster login password, then enter the command:

    export password='PASSWORD'
    
  3. 提取具有正确大小写格式的群集名称。Extract the correctly cased cluster name. 群集名称的实际大小写格式可能出乎预期,具体取决于群集的创建方式。The actual casing of the cluster name may be different than you expect, depending on how the cluster was created. 此命令将获取实际的大小写,然后将其存储在变量中。This command will obtain the actual casing, and then store it in a variable. 输入以下命令:Enter the following command:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    备注

    如果要从群集外部执行此过程,存储群集名称的过程则有所不同。If you're doing this process from outside the cluster, there is a different procedure for storing the cluster name. 从 Azure 门户中获取采用小写格式的群集名称。Get the cluster name in lower case from the Azure portal. 然后,将以下命令中的 <clustername> 替换为群集名称,并执行:export clusterName='<clustername>'Then, substitute the cluster name for <clustername> in the following command and execute it: export clusterName='<clustername>'.

  4. 若要使用 Zookeeper 主机信息来设置环境变量,请使用以下命令。To set an environment variable with Zookeeper host information, use the command below. 此命令检索所有 Zookeeper 主机,然后仅返回前两个条目。The command retrieves all Zookeeper hosts, then returns only the first two entries. 这是由于某个主机无法访问时,需要一些冗余。This is because you want some redundancy in case one host is unreachable.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    备注

    此命令需要 Ambari 访问权限。This command requires Ambari access. 如果群集位于 NSG 后面,请在可访问 Ambari 的计算机上运行此命令。If your cluster is behind an NSG, run this command from a machine that can access Ambari.

  5. 若要验证是否已正确设置了环境变量,请使用以下命令:To verify that the environment variable is set correctly, use the following command:

    echo $KAFKAZKHOSTS
    

    此命令返回类似于以下文本的信息:This command returns information similar to the following text:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.chinacloudapp.cn:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.chinacloudapp.cn:2181

  6. 若要使用 Apache Kafka 代理主机信息来设置环境变量,请使用以下命令:To set an environment variable with Apache Kafka broker host information, use the following command:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    备注

    此命令需要 Ambari 访问权限。This command requires Ambari access. 如果群集位于 NSG 后面,请在可访问 Ambari 的计算机上运行此命令。If your cluster is behind an NSG, run this command from a machine that can access Ambari.

  7. 若要验证是否已正确设置了环境变量,请使用以下命令:To verify that the environment variable is set correctly, use the following command:

    echo $KAFKABROKERS
    

    此命令返回类似于以下文本的信息:This command returns information similar to the following text:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.chinacloudapp.cn:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.chinacloudapp.cn:9092

管理 Apache Kafka 主题Manage Apache Kafka topics

Kafka 在主题中存储数据流。Kafka stores streams of data in topics. 可以使用 kafka-topics.sh 实用工具来管理主题。You can use the kafka-topics.sh utility to manage topics.

  • 若要创建主题,请在 SSH 连接中使用以下命令:To create a topic, use the following command in the SSH connection:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS

此命令使用存储在 $KAFKAZKHOSTS 中的主机信息连接到 Zookeeper,This command connects to Zookeeper using the host information stored in $KAFKAZKHOSTS. 然后创建名为 test 的 Apache Kafka 主题。It then creates an Apache Kafka topic named test.

  • 本主题中存储的数据已分区到八个分区。Data stored in this topic is partitioned across eight partitions.

  • 每个分区在群集中的三个辅助角色节点上进行复制。Each partition is replicated across three worker nodes in the cluster.

    如果在 Azure 区域中已创建提供三个容错域的群集,则复制因子使用 3。If you created the cluster in an Azure region that provides three fault domains, use a replication factor of 3. 否则,复制因子使用 4.Otherwise, use a replication factor of 4.

    在具有三个容错域的区域中,复制因子为 3 可让副本分布在容错域中。In regions with three fault domains, a replication factor of 3 allows replicas to be spread across the fault domains. 在具有两个容错域的区域中,复制因子为 4 可将副本均匀分布在域中。In regions with two fault domains, a replication factor of four spreads the replicas evenly across the domains.

    有关区域中容错域数的信息,请参阅 Linux 虚拟机的可用性文档。For information on the number of fault domains in a region, see the Availability of Linux virtual machines document.

    Apache Kafka 不识别 Azure 容错域。Apache Kafka is not aware of Azure fault domains. 在创建主题的分区副本时,它可能未针对高可用性正确分发副本。When creating partition replicas for topics, it may not distribute replicas properly for high availability.

    若要确保高可用性,请使用 Apache Kafka 分区重新均衡工具To ensure high availability, use the Apache Kafka partition rebalance tool. 必须通过 SSH 连接运行此工具,以便连接到 Apache Kafka 群集的头节点。This tool must be ran from an SSH connection to the head node of your Apache Kafka cluster.

    为确保 Apache Kafka 数据的最高可用性,应在出现以下情况时为主题重新均衡分区副本:For the highest availability of your Apache Kafka data, you should rebalance the partition replicas for your topic when:

    • 创建新主题或分区You create a new topic or partition

    • 纵向扩展群集You scale up a cluster

  • 若要列出主题,请使用以下命令:To list topics, use the following command:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    此命令列出 Apache Kafka 群集上可用的主题。This command lists the topics available on the Apache Kafka cluster.

  • 若要删除主题,使用以下命令:To delete a topic, use the following command:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic topicname --zookeeper $KAFKAZKHOSTS
    

    此命令删除名为 topicname 的主题。This command deletes the topic named topicname.

    警告

    如果删除了之前创建的 test 主题,则必须重新创建。If you delete the test topic created earlier, then you must recreate it. 稍后会在本文档中使用此主题。It is used by steps later in this document.

有关适用于 kafka-topics.sh 实用工具的命令的详细信息,请使用以下命令:For more information on the commands available with the kafka-topics.sh utility, use the following command:

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh

生成和使用记录Produce and consume records

Kafka 将记录存储在主题中。Kafka stores records in topics. 记录由生成者生成,由使用者使用。Records are produced by producers, and consumed by consumers. 生产者与使用者通过 Kafka 代理服务通信。Producers and consumers communicate with the Kafka broker service. HDInsight 群集中的每个工作节点都是 Apache Kafka 代理主机。Each worker node in your HDInsight cluster is an Apache Kafka broker host.

若要将记录存储到之前创建的测试主题,并通过使用者对其进行读取,请使用以下步骤:To store records into the test topic you created earlier, and then read them using a consumer, use the following steps:

  1. 若要为该主题写入记录,请从 SSH 连接使用 kafka-console-producer.sh 实用工具:To write records to the topic, use the kafka-console-producer.sh utility from the SSH connection:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test
    

    此命令之后是一个空行。After this command, you arrive at an empty line.

  2. 在空行中键入文本消息,然后点击 Enter。Type a text message on the empty line and hit enter. 以这种方式输入一些消息,然后使用 Ctrl + C 返回到正常的提示符处。Enter a few messages this way, and then use Ctrl + C to return to the normal prompt. 每行均作为单独的记录发送到 Apache Kafka 主题。Each line is sent as a separate record to the Apache Kafka topic.

  3. 若要读取该主题的记录,请从 SSH 连接使用 kafka-console-consumer.sh 实用工具:To read records from the topic, use the kafka-console-consumer.sh utility from the SSH connection:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning
    

    此命令从主题中检索并显示记录。This command retrieves the records from the topic and displays them. 使用 --from-beginning 告知使用者要从流的开头开始读取,以便检索所有记录。Using --from-beginning tells the consumer to start from the beginning of the stream, so all records are retrieved.

    如果使用的是较旧版本的 Kafka,请将 --bootstrap-server $KAFKABROKERS 替换为 --zookeeper $KAFKAZKHOSTSIf you are using an older version of Kafka, replace --bootstrap-server $KAFKABROKERS with --zookeeper $KAFKAZKHOSTS.

  4. 使用 Ctrl + C 停止使用者。Use Ctrl + C to stop the consumer.

还可以以编程方式创建生产者和使用者。You can also programmatically create producers and consumers. 有关如何使用此 API 的示例,请参阅将 Apache Kafka 生产者和使用者 API 与 HDInsight 配合使用文档。For an example of using this API, see the Apache Kafka Producer and Consumer API with HDInsight document.

清理资源Clean up resources

若要清理本快速入门创建的资源,可以删除资源组。To clean up the resources created by this quickstart, you can delete the resource group. 删除资源组也会删除相关联的 HDInsight 群集,以及与资源组相关联的任何其他资源。Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

若要使用 Azure 门户删除资源组,请执行以下操作:To remove the resource group using the Azure portal:

  1. 在 Azure 门户中展开左侧的菜单,打开服务菜单,然后选择“资源组”以显示资源组的列表。In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. 找到要删除的资源组,然后右键单击列表右侧的“更多”按钮 (...)。Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. 选择“删除资源组”,然后进行确认。Select Delete resource group, and then confirm.

警告

删除基于 HDInsight 的 Apache Kafka 群集会删除存储在 Kafka 中的任何数据。Deleting an Apache Kafka cluster on HDInsight deletes any data stored in Kafka.

后续步骤Next steps