了解如何创建一个使用 Apache Kafka Streams API 的应用程序并在 Kafka on HDInsight 上运行该应用程序。
本教程中使用的应用程序是流式处理字数统计。 它从 Kafka 主题读取文本数据、提取各个单词,然后将单词和计数存储到另一个 Kafka 主题。
Kafka 流处理通常使用 Apache Spark 完成。 Kafka 版本 2.1.1 和 2.4.1(在 HDInsight 4.0 和 5.0 中)支持 Kafka Streams API。 通过此 API 可以在输入和输出主题间转换数据流。
有关 Kafka Streams 的详细信息,请参阅 Apache.org 上的 Streams 简介文档。
在本教程中,你将了解如何执行以下操作:
- 了解代码
- 生成并部署应用程序
- 配置 Kafka 主题
- 运行代码
HDInsight 4.0 或 5.0 群集上的 Kafka。 若要了解如何创建 Kafka on HDInsight 群集,请参阅 Apache Kafka on HDInsight 入门文档。
完成 Apache Kafka 使用者和生成者 API 文档中的步骤。 本文档中的步骤使用本教程中创建的示例应用程序和主题。
Java Developer Kit (JDK) 版本 8 或等效工具,例如 OpenJDK。
根据 Apache 要求正确安装的 Apache Maven。 Maven 是 Java 项目的项目生成系统。
SSH 客户端。 有关详细信息,请参阅使用 SSH 连接到 HDInsight (Apache Hadoop)。
示例应用程序位于 Streaming
子目录的 https://github.com/Azure-Samples/hdinsight-kafka-java-get-started 中。 应用程序由两个文件组成:
pom.xml
:此文件定义项目依赖项、Java 版本和打包方法。Stream.java
:此文件实现流式处理逻辑。
在 pom.xml
文件中要了解的重要事项:
依赖项:此项目依赖于
kafka-clients
包提供的 Kafka Streams API。 以下 XML 代码定义此依赖项:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
${kafka.version}
条目在pom.xml
的<properties>..</properties>
部分进行声明,并配置为 HDInsight 群集的 Kafka 版本。插件:Maven 插件提供各种功能。 此项目使用了以下插件:
maven-compiler-plugin
:用于将项目使用的 Java 版本设置为 8。 HDInsight 4.0 和 5.0 需要 Java 8。maven-shade-plugin
:用于生成包含此应用程序以及任何依赖项的 uber jar。 它还用于设置应用程序的入口点,以便直接运行 Jar 文件,而无需指定主类。
Stream.java 文件使用 Streams API 实现字数统计应用程序。 它从名为 test
的 Kafka 主题读取数据,并将字数统计写到名为 wordcounts
的主题中。
以下代码定义字数统计应用程序:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
按照以下步骤生成项目并将其部署到 Kafka on HDInsight 群集:
将当前目录设置为
hdinsight-kafka-java-get-started-master\Streaming
目录的位置,然后使用以下命令创建一个 jar 包:mvn clean package
此命令在
target/kafka-streaming-1.0-SNAPSHOT.jar
创建包。将
sshuser
替换为群集的 SSH 用户,并将clustername
替换为群集的名称。 使用以下命令将kafka-streaming-1.0-SNAPSHOT.jar
文件复制到 HDInsight 群集。 出现提示时,输入 SSH 用户帐户的密码。scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.cn:kafka-streaming.jar
将
sshuser
替换为群集的 SSH 用户,并将CLUSTERNAME
替换为群集的名称。 输入以下命令,打开到群集的 SSH 连接。 出现提示时,输入 SSH 用户帐户的密码。ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
安装 jq,一个命令行 JSON 处理程序。 在打开的 SSH 连接中,输入以下命令以安装
jq
:sudo apt -y install jq
设置密码变量。 将
PASSWORD
替换为群集登录密码,然后输入以下命令:export PASSWORD='PASSWORD'
提取具有正确大小写格式的群集名称。 群集名称的实际大小写格式可能出乎预期,具体取决于群集的创建方式。 此命令将获取实际的大小写,然后将其存储在变量中。 输入以下命令:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
备注
如果要从群集外部执行此过程,存储群集名称的过程则有所不同。 从 Azure 门户中获取采用小写格式的群集名称。 然后,将以下命令中的
<clustername>
替换为群集名称,并执行:export clusterName='<clustername>'
。若要获取 Kafka 代理主机和 Apache Zookeeper 主机,请使用以下命令。 出现提示时,输入群集登录(管理员)帐户的密码。
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.cn/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.cn/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
备注
这些命令需要 Ambari 访问权限。 如果群集位于 NSG 后面,请在可访问 Ambari 的计算机上运行这些命令。
若要创建流式处理操作所使用的主题,请使用以下命令:
备注
可能会收到错误:
test
主题已存在。 此错误可忽略,因为该主题可能在生成者和使用者 API 教程中创建。/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
主题用于以下目的:
test
:本主题是接收记录的位置。 流式处理应用程序从此处读取。wordcounts
:本主题是流式处理应用程序存储其输出的位置。RekeyedIntermediateTopic
:本主题用于在countByKey
运算符更新计数时对数据进行重新分区。wordcount-example-Counts-changelog
:本主题是countByKey
操作使用的状态存储
Kafka on HDInsight 也可以配置为自动创建主题。 有关详细信息,请参阅配置自动主题创建文档。
若要将流式处理应用程序作为后台进程启动,请使用以下命令:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
可能会收到有关 Apache
log4j
的警告。 你可以忽略此警告。若要将记录发送到
test
主题,请使用以下命令启动生成者应用程序:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
生成者完成后,使用以下命令查看
wordcounts
主题中存储的信息:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
--property
参数指示控制台 consumer 输出关键字(字)以及计数(值)。 此参数还配置从 Kafka 中读取这些值时要使用的反序列化程序。输出与以下文本类似:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
参数
--from-beginning
将使用者配置为从主题中存储的记录开头启动。 每遇到一个字,计数都会递增,主题包含每个字的多个条目,因此计数不断增加。使用 Ctrl + C 退出 producer。 继续使用 Ctrl + C 退出应用程序和 consumer。
若要删除流式处理操作所使用的主题,请使用以下命令:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
若要清理本教程创建的资源,可以删除资源组。 删除资源组也会删除相关联的 HDInsight 群集,以及与资源组相关联的任何其他资源。
若要使用 Azure 门户删除资源组,请执行以下操作:
- 在 Azure 门户中展开左侧的菜单,打开服务菜单,然后选择“资源组”以显示资源组的列表。
- 找到要删除的资源组,然后右键单击列表右侧的“更多”按钮 (...)。
- 选择“删除资源组”,然后进行确认。
本文档介绍了如何将 Apache Kafka Streams API 与 Kafka on HDInsight 配合使用。 使用以下内容,详细了解如何使用 Kafka。