教程:在 Azure HDInsight 中使用 Apache Kafka Streams API

了解如何创建一个使用 Apache Kafka Streams API 的应用程序并在 Kafka on HDInsight 上运行该应用程序。

本教程中使用的应用程序是流式处理字数统计。 它从 Kafka 主题读取文本数据、提取各个单词,然后将单词和计数存储到另一个 Kafka 主题。

Kafka 流处理通常使用 Apache Spark 完成。 Kafka 版本 2.1.1 和 2.4.1(在 HDInsight 4.0 和 5.0 中)支持 Kafka Streams API。 通过此 API 可以在输入和输出主题间转换数据流。

有关 Kafka Streams 的详细信息,请参阅 Apache.org 上的 Streams 简介文档。

在本教程中,你将了解如何执行以下操作:

  • 了解代码
  • 生成并部署应用程序
  • 配置 Kafka 主题
  • 运行代码

先决条件

了解代码

示例应用程序位于 Streaming 子目录的 https://github.com/Azure-Samples/hdinsight-kafka-java-get-started 中。 应用程序由两个文件组成:

  • pom.xml:此文件定义项目依赖项、Java 版本和打包方法。
  • Stream.java:此文件实现流式处理逻辑。

Pom.xml

pom.xml 文件中要了解的重要事项:

  • 依赖项:此项目依赖于 kafka-clients 包提供的 Kafka Streams API。 以下 XML 代码定义此依赖项:

    <!-- Kafka client for producer/consumer operations -->
    
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    ${kafka.version} 条目在 pom.xml<properties>..</properties> 部分进行声明,并配置为 HDInsight 群集的 Kafka 版本。

  • 插件:Maven 插件提供各种功能。 此项目使用了以下插件:

    • maven-compiler-plugin:用于将项目使用的 Java 版本设置为 8。 HDInsight 4.0 和 5.0 需要 Java 8。
    • maven-shade-plugin:用于生成包含此应用程序以及任何依赖项的 uber jar。 它还用于设置应用程序的入口点,以便直接运行 Jar 文件,而无需指定主类。

Stream.java

Stream.java 文件使用 Streams API 实现字数统计应用程序。 它从名为 test 的 Kafka 主题读取数据,并将字数统计写到名为 wordcounts 的主题中。

以下代码定义字数统计应用程序:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

生成并部署示例

按照以下步骤生成项目并将其部署到 Kafka on HDInsight 群集:

  1. 将当前目录设置为 hdinsight-kafka-java-get-started-master\Streaming 目录的位置,然后使用以下命令创建一个 jar 包:

    mvn clean package
    

    此命令在 target/kafka-streaming-1.0-SNAPSHOT.jar 创建包。

  2. sshuser 替换为群集的 SSH 用户,并将 clustername 替换为群集的名称。 使用以下命令将 kafka-streaming-1.0-SNAPSHOT.jar 文件复制到 HDInsight 群集。 出现提示时,输入 SSH 用户帐户的密码。

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.cn:kafka-streaming.jar
    

创建 Apache Kafka 主题

  1. sshuser 替换为群集的 SSH 用户,并将 CLUSTERNAME 替换为群集的名称。 输入以下命令,打开到群集的 SSH 连接。 出现提示时,输入 SSH 用户帐户的密码。

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
    
  2. 安装 jq,一个命令行 JSON 处理程序。 在打开的 SSH 连接中,输入以下命令以安装 jq

    sudo apt -y install jq
    
  3. 设置密码变量。 将 PASSWORD 替换为群集登录密码,然后输入以下命令:

    export PASSWORD='PASSWORD'
    
  4. 提取具有正确大小写格式的群集名称。 群集名称的实际大小写格式可能出乎预期,具体取决于群集的创建方式。 此命令将获取实际的大小写,然后将其存储在变量中。 输入以下命令:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    注意

    如果要从群集外部执行此过程,存储群集名称的过程则有所不同。 从 Azure 门户中获取采用小写格式的群集名称。 然后,将以下命令中的 <clustername> 替换为群集名称,并执行:export clusterName='<clustername>'

  5. 若要获取 Kafka 代理主机和 Apache Zookeeper 主机,请使用以下命令。 出现提示时,输入群集登录(管理员)帐户的密码。

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.cn/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.cn/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    注意

    这些命令需要 Ambari 访问权限。 如果群集位于 NSG 后面,请在可访问 Ambari 的计算机上运行这些命令。

  6. 若要创建流式处理操作所使用的主题,请使用以下命令:

    注意

    可能会收到错误:test 主题已存在。 此错误可忽略,因为该主题可能在生成者和使用者 API 教程中创建。

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    主题用于以下目的:

    • test:本主题是接收记录的位置。 流式处理应用程序从此处读取。
    • wordcounts:本主题是流式处理应用程序存储其输出的位置。
    • RekeyedIntermediateTopic:本主题用于在 countByKey 运算符更新计数时对数据进行重新分区。
    • wordcount-example-Counts-changelog:本主题是 countByKey 操作使用的状态存储

    Kafka on HDInsight 也可以配置为自动创建主题。 有关详细信息,请参阅配置自动主题创建文档。

运行代码

  1. 若要将流式处理应用程序作为后台进程启动,请使用以下命令:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    可能会收到有关 Apache log4j 的警告。 你可以忽略此警告。

  2. 若要将记录发送到 test 主题,请使用以下命令启动生成者应用程序:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. 生成者完成后,使用以下命令查看 wordcounts 主题中存储的信息:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    --property 参数指示控制台 consumer 输出关键字(字)以及计数(值)。 此参数还配置从 Kafka 中读取这些值时要使用的反序列化程序。

    输出与以下文本类似:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    参数 --from-beginning 将使用者配置为从主题中存储的记录开头启动。 每遇到一个字,计数都会递增,主题包含每个字的多个条目,因此计数不断增加。

  4. 使用 Ctrl + C 退出 producer。 继续使用 Ctrl + C 退出应用程序和 consumer。

  5. 若要删除流式处理操作所使用的主题,请使用以下命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

清理资源

若要清理本教程创建的资源,可以删除资源组。 删除资源组也会删除相关联的 HDInsight 群集,以及与资源组相关联的任何其他资源。

若要使用 Azure 门户删除资源组,请执行以下操作:

  1. 在 Azure 门户中展开左侧的菜单,打开服务菜单,然后选择“资源组”以显示资源组的列表。
  2. 找到要删除的资源组,然后右键单击列表右侧的“更多”按钮 (...)。
  3. 选择“删除资源组”,然后进行确认。

后续步骤

本文档介绍了如何将 Apache Kafka Streams API 与 Kafka on HDInsight 配合使用。 使用以下内容,详细了解如何使用 Kafka。