教程:使用 Apache Kafka 生成者和使用者 APITutorial: Use the Apache Kafka Producer and Consumer APIs

了解如何将 Apache Kafka 生成者和使用者 API 与 Kafka on HDInsight 配合使用。Learn how to use the Apache Kafka Producer and Consumer APIs with Kafka on HDInsight.

Kafka 生成者 API 允许应用程序将数据流发送到 Kafka 群集。The Kafka Producer API allows applications to send streams of data to the Kafka cluster. Kafka 使用者 API 允许应用程序从群集读取数据流。The Kafka Consumer API allows applications to read streams of data from the cluster.

本教程介绍如何执行下列操作:In this tutorial, you learn how to:

  • 必备条件Prerequisites
  • 了解代码Understand the code
  • 生成并部署应用程序Build and deploy the application
  • 在群集上运行应用程序Run the application on the cluster

有关这些 API 的详细信息,请参阅有关生成者 API使用者 API 的 Apache 文档。For more information on the APIs, see Apache documentation on the Producer API and Consumer API.

必备条件Prerequisites

了解代码Understand the code

示例应用程序位于 Producer-Consumer 子目录的 https://github.com/Azure-Samples/hdinsight-kafka-java-get-started 中。The example application is located at https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in the Producer-Consumer subdirectory. 该应用程序主要包含四个文件:The application consists primarily of four files:

该应用程序主要包含四个文件:The application consists primarily of four files:

  • pom.xml:此文件定义项目依赖项、Java 版本和打包方法。pom.xml: This file defines the project dependencies, Java version, and packaging methods.
  • Producer.java:此文件使用生成者 API 将随机句子发送到 Kafka。Producer.java: This file sends random sentences to Kafka using the producer API.
  • Consumer.java:此文件使用使用者 API 从 Kafka 读取数据并将其发出到 STDOUT。Consumer.java: This file uses the consumer API to read data from Kafka and emit it to STDOUT.
  • AdminClientWrapper.java:此文件使用管理 API 来创建、描述和删除 Kafka 主题。AdminClientWrapper.java: This file uses the admin API to create, describe, and delete Kafka topics.
  • Run.java:用于运行生成者和使用者代码的命令行接口。Run.java: The command-line interface used to run the producer and consumer code.

Pom.xmlPom.xml

pom.xml 文件中要了解的重要事项:The important things to understand in the pom.xml file are:

  • 依赖项:此项目依赖于 kafka-clients 包提供的 Kafka 生成者和使用者 API。Dependencies: This project relies on the Kafka producer and consumer APIs, which are provided by the kafka-clients package. 以下 XML 代码定义此依赖项:The following XML code defines this dependency:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    

    ${kafka.version} 条目在 pom.xml<properties>..</properties> 部分进行声明,并配置为 HDInsight 群集的 Kafka 版本。The ${kafka.version} entry is declared in the <properties>..</properties> section of pom.xml, and is configured to the Kafka version of the HDInsight cluster.

  • 插件:Maven 插件提供各种功能。Plugins: Maven plugins provide various capabilities. 此项目使用了以下插件:In this project, the following plugins are used:

    • maven-compiler-plugin:用于将项目使用的 Java 版本设置为 8。maven-compiler-plugin: Used to set the Java version used by the project to 8. 这是 HDInsight 3.6 的 Java 版本。This is the version of Java used by HDInsight 3.6.
    • maven-shade-plugin:用于生成包含此应用程序以及任何依赖项的 uber jar。maven-shade-plugin: Used to generate an uber jar that contains this application as well as any dependencies. 它还用于设置应用程序的入口点,以便直接运行 Jar 文件,而无需指定主类。It is also used to set the entry point of the application, so that you can directly run the Jar file without having to specify the main class.

Producer.javaProducer.java

生成者与 Kafka 中转站主机(辅助角色节点)进行通信,并将数据发送到 Kafka 主题。The producer communicates with the Kafka broker hosts (worker nodes) and sends data to a Kafka topic. 以下代码片段摘自 GitHub 存储库中的 Producer.java 文件,演示了如何设置生成者属性:The following code snippet is from the Producer.java file from the GitHub repository and shows how to set the producer properties:

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.javaConsumer.java

使用者与 Kafka 代理主机(辅助角色节点)进行通信,并在循环中读取记录。The consumer communicates with the Kafka broker hosts (worker nodes), and reads records in a loop. Consumer.java 文件中的以下代码片段设置了使用者属性:The following code snippet from the Consumer.java file sets the consumer properties:

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

在此代码中,使用者配置为从该主题的开头读取(auto.offset.reset 设置为 earliest)。In this code, the consumer is configured to read from the start of the topic (auto.offset.reset is set to earliest.)

Run.javaRun.java

Run.java 文件提供了命令行接口,可运行生成者或使用者代码。The Run.java file provides a command line interface that runs either the producer or consumer code. 必须提供 Kafka 代理主机信息作为参数。You must provide the Kafka broker host information as a parameter. 可以选择包括组 ID 值,该值由使用者进程使用。You can optionally include a group id value, which is used by the consumer process. 如果使用相同的组 ID 创建多个使用者实例,它们将对主题的读取进行负载均衡。If you create multiple consumer instances using the same group id, they will load balance reading from the topic.

生成并部署示例Build and deploy the example

  1. https://github.com/Azure-Samples/hdinsight-kafka-java-get-started 下载并提取示例。Download and extract the examples from https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. 将当前目录设置为 hdinsight-kafka-java-get-started\Producer-Consumer 目录的位置,然后执行以下命令:Set your current directory to the location of the hdinsight-kafka-java-get-started\Producer-Consumer directory and use the following command:

    mvn clean package
    

    此命令创建名为 target 的目录,其中包含名为 kafka-producer-consumer-1.0-SNAPSHOT.jar 的文件。This command creates a directory named target, that contains a file named kafka-producer-consumer-1.0-SNAPSHOT.jar.

  3. sshuser 替换为群集的 SSH 用户,并将 CLUSTERNAME 替换为群集的名称。Replace sshuser with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. 输入以下命令,将 kafka-producer-consumer-1.0-SNAPSHOT.jar 文件复制到 HDInsight 群集。Enter the following command to copy the kafka-producer-consumer-1.0-SNAPSHOT.jar file to your HDInsight cluster. 出现提示时,请输入 SSH 用户的密码。When prompted enter the password for the SSH user.

    scp ./target/kafka-producer-consumer-1.0-SNAPSHOT.jar SSHUSER@CLUSTERNAME-ssh.azurehdinsight.cn:kafka-producer-consumer.jar
    

运行示例Run the example

  1. sshuser 替换为群集的 SSH 用户,并将 CLUSTERNAME 替换为群集的名称。Replace sshuser with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. 输入以下命令,打开到群集的 SSH 连接。Open an SSH connection to the cluster, by entering the following command. 出现提示时,输入 SSH 用户帐户的密码。If prompted, enter the password for the SSH user account.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
    
  2. 若要获取 Kafka 代理主机,请替换以下命令中 <clustername><password> 的值并执行该命令。To get the Kafka broker hosts, substitute the values for <clustername> and <password> in the following command and execute it. 对于 <clustername>,请使用如 Azure 门户中所示相同的大小写。Use the same casing for <clustername> as shown in the Azure portal. <password> 替换为群集登录密码,然后执行:Replace <password> with the cluster login password, then execute:

    sudo apt -y install jq
    export clusterName='<clustername>'
    export password='<password>'
    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    备注

    此命令需要 Ambari 访问权限。This command requires Ambari access. 如果群集位于 NSG 后面,请在可访问 Ambari 的计算机上运行此命令。If your cluster is behind an NSG, run this command from a machine that can access Ambari.

  3. 通过输入以下命令创建 Kafka 主题 myTestCreate Kafka topic, myTest, by entering the following command:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  4. 若要运行生成者并将数据写入到主题,请使用以下命令:To run the producer and write data to the topic, use the following command:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  5. 在生成者完成后,使用以下命令从主题中读取:Once the producer has finished, use the following command to read from the topic:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    

    会显示已读取的记录以及记录计数。The records read, along with a count of records, is displayed.

  6. 使用 Ctrl + C 退出使用者。Use Ctrl + C to exit the consumer.

多个使用者Multiple consumers

在读取记录时,Kafka 使用者使用使用者组。Kafka consumers use a consumer group when reading records. 对多个使用者使用相同的组会导致从主题进行负载均衡读取。Using the same group with multiple consumers results in load balanced reads from a topic. 组中的每个使用者都会接收一部分记录。Each consumer in the group receives a portion of the records.

使用者应用程序接受一个用作组 ID 的参数。The consumer application accepts a parameter that is used as the group ID. 例如,以下命令使用组 ID myGroup 启动一个使用者:For example, the following command starts a consumer using a group ID of myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

使用 Ctrl + C 退出使用者。Use Ctrl + C to exit the consumer. 若要在操作中了解此过程,请使用以下命令:To see this process in action, use the following command:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

此命令使用 tmux 将终端拆分为两列。This command uses tmux to split the terminal into two columns. 每列中都会启动使用者,且具有相同的组 ID 值。A consumer is started in each column, with the same group ID value. 使用者完成读取后,请注意,每个使用者仅读取记录的一部分。Once the consumers finish reading, notice that each read only a portion of the records. 按 Ctrl+C 两次以退出 tmuxUse Ctrl + C twice to exit tmux.

同一个组中客户端的使用方式由主题的分区处理。Consumption by clients within the same group is handled through the partitions for the topic. 在此代码示例中,之前创建的 test 主题有 8 个分区。In this code sample, the test topic created earlier has eight partitions. 如果启动 8 个使用者,则每个使用者都从主题的单个分区读取记录。If you start eight consumers, each consumer reads records from a single partition for the topic.

重要

使用者组中的使用者实例数不能超过分区数。There cannot be more consumer instances in a consumer group than partitions. 此示例中,一个使用者组最多可包含八个使用者,因为这是本主题中的分区数。In this example, one consumer group can contain up to eight consumers since that is the number of partitions in the topic. 也可拥有多个使用者组,每个组的使用者不能超过八个。Or you can have multiple consumer groups, each with no more than eight consumers.

Kafka 中存储的记录将按接收顺序存储在分区中。Records stored in Kafka are stored in the order they are received within a partition. 若要 在分区中实现有序的记录传送,可以创建使用者实例数与分区数相匹配的使用者组。To achieve in-ordered delivery for records within a partition, create a consumer group where the number of consumer instances matches the number of partitions. 若要 在主题中实现有序的记录传送,可以创建仅包含一个使用者实例的使用者组。To achieve in-ordered delivery for records within the topic, create a consumer group with only one consumer instance.

清理资源Clean up resources

若要清理本教程创建的资源,可以删除资源组。To clean up the resources created by this tutorial, you can delete the resource group. 删除资源组也会删除相关联的 HDInsight 群集,以及与资源组相关联的任何其他资源。Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

若要使用 Azure 门户删除资源组,请执行以下操作:To remove the resource group using the Azure portal:

  1. 在 Azure 门户中展开左侧的菜单,打开服务菜单,然后选择“资源组”以显示资源组的列表。 In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. 找到要删除的资源组,然后右键单击列表右侧的“更多”按钮 (...)。 Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. 选择“删除资源组”,然后进行确认。 Select Delete resource group, and then confirm.