教程:将 Apache Storm 与 Apache Kafka on HDInsight 配合使用Tutorial: Use Apache Storm with Apache Kafka on HDInsight

本教程说明如何使用 Apache Storm 拓扑并通过 Apache Kafka on Azure HDInsight 来读取和写入数据。This tutorial demonstrates how to use an Apache Storm topology to read and write data with Apache Kafka on HDInsight. 本教程还说明如何将数据保存到 Storm 群集上的 Apache Hadoop HDFS 兼容存储。This tutorial also demonstrates how to persist data to the Apache Hadoop HDFS compatible storage on the Storm cluster.

本教程介绍如何执行下列操作:In this tutorial, you learn how to:

  • Storm 和 KafkaStorm and Kafka
  • 了解代码Understanding the code
  • 创建 Kafka 和 Spark 群集Create Kafka and Storm clusters
  • 生成拓扑Build the topology
  • 配置拓扑Configure the topology
  • 创建 Kafka 主题Create the Kafka topic
  • 启动拓扑Start the topologies
  • 停止拓扑Stop the topologies
  • 清理资源Clean up resources

先决条件Prerequisites

可以在开发工作站上安装 Java 和 JDK 时设置以下环境变量。The following environment variables may be set when you install Java and the JDK on your development workstation. 不过,应该检查它们是否存在并且包含系统的正确值。However, you should check that they exist and that they contain the correct values for your system.

  • JAVA_HOME - 应该指向 JDK 的安装目录。JAVA_HOME - should point to the directory where the JDK is installed.

  • PATH - 应该包含以下路径:PATH - should contain the following paths:

    • JAVA_HOME(或等效路径)。JAVA_HOME (or the equivalent path).
    • JAVA_HOME\bin(或等效路径)。JAVA_HOME\bin (or the equivalent path).
    • Maven 的安装目录。The directory where Maven is installed.

Important

本文档中的步骤需要一个 Azure 资源组并且该资源组同时包含 Storm on HDInsight 群集和 Kafka on HDInsight 群集。The steps in this document require an Azure resource group that contains both a Storm on HDInsight and a Kafka on HDInsight cluster. 这两个群集位于同一个 Azure 虚拟网络中,因此,Storm 群集可以直接与 Kafka 群集通信。These clusters are both located within an Azure Virtual Network, which allows the Storm cluster to directly communicate with the Kafka cluster.

为方便起见,本文档链接到了一个模板,该模板可创建所有所需 Azure 资源。For your convenience, this document links to a template that can create all the required Azure resources.

有关在虚拟网络中使用 HDInsight 的详细信息,请参阅使用虚拟网络扩展 HDInsight 文档。For more information on using HDInsight in a virtual network, see the Extend HDInsight using a virtual network document.

Storm 和 KafkaStorm and Kafka

Apache Storm 提供了多个组件以便与 Apache Kafka 配合使用。Apache Storm provides the several components for working with Apache Kafka. 此教程中使用了以下组件:The following components are used in this tutorial:

  • org.apache.storm.kafka.KafkaSpout:此组件用于读取 Kafka 中的数据。org.apache.storm.kafka.KafkaSpout: This component reads data from Kafka. 此组件依赖于下列组件:This component relies on the following components:

    • org.apache.storm.kafka.SpoutConfig:提供 Spout 组件的配置。org.apache.storm.kafka.SpoutConfig: Provides configuration for the spout component.

    • org.apache.storm.spout.SchemeAsMultiSchemeorg.apache.storm.kafka.StringScheme:Kafka 中的数据转换成 Storm 元组的方式。org.apache.storm.spout.SchemeAsMultiScheme and org.apache.storm.kafka.StringScheme: How the data from Kafka is transformed into a Storm tuple.

  • org.apache.storm.kafka.bolt.KafkaBolt:此组件将数据写入 Kafka。org.apache.storm.kafka.bolt.KafkaBolt: This component writes data to Kafka. 此组件依赖于下列组件:This component relies on the following components:

    • org.apache.storm.kafka.bolt.selector.DefaultTopicSelector:描述写入到的主题。org.apache.storm.kafka.bolt.selector.DefaultTopicSelector: Describes the topic that is written to.

    • org.apache.kafka.common.serialization.StringSerializer:配置 Bolt 以将数据串行化为字符串值。org.apache.kafka.common.serialization.StringSerializer: Configures the bolt to serialize data as a string value.

    • org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper:将 Storm 拓扑内使用的元组数据结构映射到存储在 Kafka 中的字段。org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper: Maps from the tuple data structure used inside the Storm topology to fields stored in Kafka.

org.apache.storm : storm-kafka 包提供了这些组件。These components are available in the org.apache.storm : storm-kafka package. 使用与 Storm 版本相匹配的包版本。Use the package version that matches the Storm version. 对于 HDInsight 3.6,Storm 版本为 1.1.0。For HDInsight 3.6, the Storm version is 1.1.0. 还需要 org.apache.kafka : kafka_2.10 包,其中包含其他 Kafka 组件。You also need the org.apache.kafka : kafka_2.10 package, which contains additional Kafka components. 使用与 Storm 版本相匹配的 Kafka 版本。Use the package version that matches the Kafka version. 对于 HDInsight 3.6,Kafka 版本为 1.1.1。For HDInsight 3.6, the Kafka version is 1.1.1.

以下 XML 是 pom.xmlApache Maven 项目的依赖项声明:The following XML is the dependency declaration in the pom.xml for an Apache Maven project:

<!-- Storm components for talking to Kafka -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.1.0</version>
</dependency>
<!-- needs to be the same Kafka version as used on your cluster -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>1.1.1</version>
    <!-- Exclude components that are loaded from the Storm cluster at runtime -->
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

了解代码Understanding the code

https://github.com/Azure-Samples/hdinsight-storm-java-kafka 中提供了本文档所用代码。The code used in this document is available at https://github.com/Azure-Samples/hdinsight-storm-java-kafka.

本教程提供了两种拓扑:There are two topologies provided with this tutorial:

  • Kafka 编写器:生成随机句子并将其存储到 Kafka。Kafka-writer: Generates random sentences and stores them to Kafka.

  • Kafka 读取器:读取 Kafka 中的数据,然后将其存储到 Storm 群集的 HDFS 兼容文件存储。Kafka-reader: Reads data from Kafka and then stores it to the HDFS compatible file store for the Storm cluster.

    Warning

    若要使 Storm 与 HDInsight 使用的 HDFS 兼容存储配合使用,需执行脚本操作。To enable the Storm to work with the HDFS compatible storage used by HDInsight, a script action is required. 脚本会向 Storm 的 extlib 路径安装多个 jar 文件。The script installs several jar files to the extlib path for Storm. 使用本教程中的模板可在群集创建过程中自动使用脚本。The template in this tutorial automatically uses the script during cluster creation.

    如果不使用本文档中的模板创建 Storm 群集,则必须手动将脚本操作应用于群集。If you do not use the template in this document to create the Storm cluster, then you must manually apply the script action to your cluster.

    脚本操作位于 https://hdiconfigactions.blob.core.windows.net/linuxstormextlibv01/stormextlib.sh 并应用于 Storm 群集的 supervisor 和 nimbus 节点。The script action is located at https://hdiconfigactions.blob.core.windows.net/linuxstormextlibv01/stormextlib.sh and is applied to the supervisor and nimbus nodes of the Storm cluster. 有关使用脚本操作的详细信息,请参阅使用脚本操作自定义 HDInsight 文档。For more information on using script actions, see the Customize HDInsight using script actions document.

拓扑是使用 Flux定义的。The topologies are defined using Flux. Storm 0.10.x 中引入了 Flux,允许从代码分离拓扑配置。Flux was introduced in Storm 0.10.x and allows you to separate the topology configuration from the code. 对于使用 Flux 框架的拓扑,该拓扑在 YAML 文件中进行定义。For Topologies that use the Flux framework, the topology is defined in a YAML file. YAML 文件可以作为拓扑的一部分包括在内。The YAML file can be included as part of the topology. 它也可以是提交拓扑时使用的独立文件。It can also be a standalone file used when you submit the topology. Flux 还支持在运行时进行变量替换,本示例中使用了该变量替换。Flux also supports variable substitution at run-time, which is used in this example.

在运行时为这些拓扑设置以下参数:The following parameters are set at run time for these topologies:

  • ${kafka.topic}:拓扑读取/写入的 Kafka 主题的名称。${kafka.topic}: The name of the Kafka topic that the topologies read/write to.

  • ${kafka.broker.hosts}:运行 Kafka 中转站的主机。${kafka.broker.hosts}: The hosts that the Kafka brokers run on. KafkaBolt 在写入 Kafka 时会使用中转站信息。The broker information is used by the KafkaBolt when writing to Kafka.

  • ${kafka.zookeeper.hosts}:Kafka 群集中运行 Zookeeper 的主机。${kafka.zookeeper.hosts}: The hosts that Zookeeper runs on in the Kafka cluster.

  • ${hdfs.url}:HDFSBolt 组件的文件系统 URL。${hdfs.url}: The file system URL for the HDFSBolt component. 指示是否已将数据写入 Azure 存储帐户或 Azure Data Lake Storage。Indicates whether the data is written to an Azure Storage account or Azure Data Lake Storage.

  • ${hdfs.write.dir}:数据写入到的目录。${hdfs.write.dir}: The directory that data is written to.

有关 Flux 拓扑的详细信息,请参阅 https://storm.apache.org/releases/1.1.2/flux.htmlFor more information on Flux topologies, see https://storm.apache.org/releases/1.1.2/flux.html.

Kafka 编写器Kafka-writer

在 Kafka 编写器拓扑中,Kafka bolt 组件将两个字符串值作为参数。In the Kafka-writer topology, the Kafka bolt component takes two string values as parameters. 这些参数指示 bolt 将哪些元组字段发送到 Kafka 作为密钥值和消息值 。These parameters indicate which tuple fields the bolt sends to Kafka as key and message values. 密钥用于对 Kafka 中的数据进行分区。The key is used to partition data in Kafka. 消息是正在存储的数据。The message is the data being stored.

在此示例中,com.microsoft.example.SentenceSpout 组件会发出包含两个字段(keymessage)的元组。In this example, the com.microsoft.example.SentenceSpout component emits a tuple that contains two fields, key and message. Kafka bolt 提取这些字段,并将其中的数据发送到 Kafka。The Kafka bolt extracts these fields and sends the data in them to Kafka.

这两个字段不必使用名称 keymessageThe fields don't have to use the names key and message. 此项目中使用这些名称是便于映射易于理解。These names are used in this project to make the mapping easier to understand.

以下 YAML 对 Kafka 编写器组件进行了定义:The following YAML is the definition for the Kafka-writer component:

# kafka-writer
---

# topology definition
# name to be used when submitting
name: "kafka-writer"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Topic selector for KafkaBolt
  - id: "topicSelector"
    className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"
    constructorArgs:
      - "${kafka.topic}"

  # Mapper for KafkaBolt
  - id: "kafkaMapper"
    className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"
    constructorArgs:
      - "key"
      - "message"

  # Producer properties for KafkaBolt
  - id: "producerProperties"
    className: "java.util.Properties"
    configMethods:
      - name: "put"
        args:
          - "bootstrap.servers"
          - "${kafka.broker.hosts}"
      - name: "put"
        args:
          - "acks"
          - "1"
      - name: "put"
        args:
          - "key.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
      - name: "put"
        args:
          - "value.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
 

# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "sentence-spout"
    className: "com.microsoft.example.SentenceSpout"
    parallelism: 8

# Bolt definitions
bolts:
  - id: "kafka-bolt"
    className: "org.apache.storm.kafka.bolt.KafkaBolt"
    parallelism: 8
    configMethods:
    - name: "withProducerProperties"
      args: [ref: "producerProperties"]
    - name: "withTopicSelector"
      args: [ref: "topicSelector"]
    - name: "withTupleToKafkaMapper"
      args: [ref: "kafkaMapper"]

# Stream definitions

streams:
  - name: "spout --> kafka" # Streams data from the sentence spout to the Kafka bolt
    from: "sentence-spout"
    to: "kafka-bolt"
    grouping:
      type: SHUFFLE

Kafka 读取器Kafka-reader

在 Kafka 读取器拓扑中,Spout 组件从 Kafka 读取数据作为字符串值。In the Kafka-reader topology, the spout component reads data from Kafka as string values. 然后,数据通过日志记录组件写入 Storm 日志,并通过 HDFS bolt 组件写入 Storm 群集的 HDFS 兼容文件系统。The data is then written the Storm log by the logging component and to the HDFS compatible file system for the Storm cluster by the HDFS bolt component.

# kafka-reader
---

# topology definition
# name to be used when submitting
name: "kafka-reader"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Convert data from Kafka into string tuples in storm
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"
  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "${kafka.zookeeper.hosts}"

  # Spout configuration
  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "zkHosts"
      # topic
      - "${kafka.topic}"
      # zkRoot
      - ""
      # id
      - "readerid"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

    # How often to sync files to HDFS; every 1000 tuples.
  - id: "syncPolicy"
    className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
    constructorArgs:
      - 1

  # Rotate files when they hit 5 MB
  - id: "rotationPolicy"
    className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
    constructorArgs:
      - 5
      - "KB"

  # File format; read the directory from filters at run time, and use a .txt extension when writing.
  - id: "fileNameFormat"
    className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
    configMethods:
      - name: "withPath"
        args: ["${hdfs.write.dir}"]
      - name: "withExtension"
        args: [".txt"]

  # Internal file format; fields delimited by `|`.
  - id: "recordFormat"
    className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat"
    configMethods:
      - name: "withFieldDelimiter"
        args: ["|"]

# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "kafka-spout"
    className: "org.apache.storm.kafka.KafkaSpout"
    constructorArgs:
      - ref: "spoutConfig"
    # Set to the number of partitions for the topic
    parallelism: 8

# Bolt definitions
bolts:
  - id: "logger-bolt"
    className: "com.microsoft.example.LoggerBolt"
    parallelism: 1
  
  - id: "hdfs-bolt"
    className: "org.apache.storm.hdfs.bolt.HdfsBolt"
    configMethods:
      - name: "withConfigKey"
        args: ["hdfs.config"]
      - name: "withFsUrl"
        args: ["${hdfs.url}"]
      - name: "withFileNameFormat"
        args: [ref: "fileNameFormat"]
      - name: "withRecordFormat"
        args: [ref: "recordFormat"]
      - name: "withRotationPolicy"
        args: [ref: "rotationPolicy"]
      - name: "withSyncPolicy"
        args: [ref: "syncPolicy"]
    parallelism: 1

# Stream definitions

streams:
  # Stream data to log
  - name: "kafka --> log" # name isn't used (placeholder for logging, UI, etc.)
    from: "kafka-spout"
    to: "logger-bolt"
    grouping:
      type: SHUFFLE
  
  # stream data to file
  - name: "kafka --> hdfs"
    from: "kafka-spout"
    to: "hdfs-bolt"
    grouping:
      type: SHUFFLE

属性替换项Property substitutions

项目包含名为 dev.properties 的文件,用于传递拓扑使用的参数。The project contains a file named dev.properties that is used to pass parameters used by the topologies. 它定义以下属性:It defines the following properties:

dev.properties 文件dev.properties file 说明Description
kafka.zookeeper.hosts Kafka 群集的 Apache ZooKeeper 主机。The Apache ZooKeeper hosts for the Kafka cluster.
kafka.broker.hosts Kafka 代理主机(辅助角色节点)。The Kafka broker hosts (worker nodes).
kafka.topic 拓扑使用的 Kafka 主题。The Kafka topic that the topologies use.
hdfs.write.dir Kafka 读取器拓扑写入的目录。The directory that the Kafka-reader topology writes to.
hdfs.url Storm 群集使用的文件系统。The file system used by the Storm cluster. 对于 Azure 存储帐户,使用值 wasb:///For Azure Storage accounts, use a value of wasb:///. 对于 Azure Data Lake Storage Gen2,使用值 abfs:///For Azure Data Lake Storage Gen2, use a value of abfs:///.

创建群集Create the clusters

Apache Kafka on HDInsight 不提供通过公共 Internet 访问 Kafka 中转站的权限。Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. 使用 Kafka 的任何项都必须位于同一 Azure 虚拟网络中。Anything that uses Kafka must be in the same Azure virtual network. 在本教程中,Kafka 和 Storm 群集位于同一 Azure 虚拟网络。In this tutorial, both the Kafka and Storm clusters are located in the same Azure virtual network.

下图显示通信在 Storm 和 Kafka 之间的流动方式:The following diagram shows how communication flows between Storm and Kafka:

Azure 虚拟网络中的 Storm 和 Kafka 群集示意图

Note

通过 Internet 可访问群集上的其他服务,例如 SSH 和 Apache AmbariOther services on the cluster such as SSH and Apache Ambari can be accessed over the internet. 有关可用于 HDInsight 的公共端口的详细信息,请参阅 HDInsight 使用的端口和 URIFor more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

若要创建 Azure 虚拟网络,然后在其中创建 Kafka 和 Storm 群集,请使用以下步骤:To create an Azure Virtual Network, and then create the Kafka and Storm clusters within it, use the following steps:

  1. 使用以下按钮登录到 Azure,并在 Azure 门户中打开模板。Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure

    Azure 资源管理器模板位于 https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.jsonThe Azure Resource Manager template is located at https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.json. 它创建以下资源:It creates the following resources:

    • Azure 资源组Azure resource group
    • Azure 虚拟网络Azure Virtual Network
    • Azure 存储帐户Azure Storage account
    • HDInsight 版本 3.6 上的 Kafka(三个辅助角色节点)Kafka on HDInsight version 3.6 (three worker nodes)
    • HDInsight 版本 3.6 上的 Storm(三个辅助角色节点)Storm on HDInsight version 3.6 (three worker nodes)

    Warning

    若要确保 Kafka on HDInsight 的可用性,群集必须至少包含 3 个辅助节点。To guarantee availability of Kafka on HDInsight, your cluster must contain at least three worker nodes. 此模板创建的 Kafka 群集包含三个辅助角色节点。This template creates a Kafka cluster that contains three worker nodes.

  2. 使用以下指南填充“自定义部署” 部分中的条目:Use the following guidance to populate the entries on the Custom deployment section:

    1. 使用以下信息填充“自定义模板”部分的条目 :Use the following information to populate the entries on the Customized template section:

      设置Setting ValueValue
      订阅Subscription Azure 订阅Your Azure subscription
      资源组Resource group 包含资源的资源组。The resource group that contains the resources.
      位置Location 创建资源时所在的 Azure 区域。The Azure region that the resources are created in.
      Kafka 群集名称Kafka Cluster Name Kafka 群集的名称。The name of the Kafka cluster.
      Storm 群集名称Storm Cluster Name Storm 群集的名称。The name of the Storm cluster.
      群集登录用户名Cluster Login User Name 群集的管理员用户名。The admin user name for the clusters.
      群集登录密码Cluster Login Password 群集的管理员用户密码。The admin user password for the clusters.
      SSH 用户名SSH User Name 要为群集创建的 SSH 用户。The SSH user to create for the clusters.
      SSH 密码SSH Password 用于 SSH 用户的密码。The password for the SSH user.

      模板参数图片

  3. 阅读“条款和条件” ,并选择“我同意上述条款和条件” 。Read the Terms and Conditions, and then select I agree to the terms and conditions stated above.

  4. 最后,选中“固定到仪表板” ,并选择“购买” 。Finally, check Pin to dashboard and then select Purchase.

Note

创建群集可能需要长达 20 分钟的时间。It can take up to 20 minutes to create the clusters.

生成拓扑Build the topology

  1. 在开发环境中,从 https://github.com/Azure-Samples/hdinsight-storm-java-kafka 下载项目,打开一个命令行,并将目录更改为下载该项目的位置。On your development environment, download the project from https://github.com/Azure-Samples/hdinsight-storm-java-kafka, open a command-line, and change directories to the location that you downloaded the project.

  2. hdinsight-storm-java-kafka 目录,使用以下命令来编译该项目并创建用于部署的包:From the hdinsight-storm-java-kafka directory, use the following command to compile the project and create a package for deployment:

    mvn clean package
    

    包过程会在 target 目录中创建名为 KafkaTopology-1.0-SNAPSHOT.jar 的文件。The package process creates a file named KafkaTopology-1.0-SNAPSHOT.jar in the target directory.

  3. 使用以下命令将该包复制到 Storm on HDInsight 群集。Use the following commands to copy the package to your Storm on HDInsight cluster. sshuser 替换为群集的 SSH 用户名。Replace sshuser with the SSH user name for the cluster. stormclustername 替换为 Storm 群集的名称 。Replace stormclustername with the name of the Storm cluster.

    scp ./target/KafkaTopology-1.0-SNAPSHOT.jar sshuser@stormclustername-ssh.azurehdinsight.cn:KafkaTopology-1.0-SNAPSHOT.jar
    

    出现提示时,请输入在创建群集时使用的密码。When prompted, enter the password you used when creating the clusters.

配置拓扑Configure the topology

  1. 使用以下方法之一发现 HDInsight 群集上的 Kafka 的 Kafka 中转站主机: Use one of the following methods to discover the Kafka broker hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER" `
        -Credential $creds
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $brokerHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($brokerHosts -join ":9092,") + ":9092"
    

    Important

    以下 Bash 示例假定 $CLUSTERNAME 包含 Kafka 群集名的名称 。The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster name. 它还假定安装了 jq 1.5 或更高版本。It also assumes that jq version 1.5 or greater is installed. 出现提示时,输入群集登录帐户的密码。When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.cn/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER" | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2
    

    返回的值类似于下文:The value returned is similar to the following text:

     wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.chinacloudapp.cn:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.chinacloudapp.cn:9092
    

    Important

    虽然群集可能有两个以上的代理主机,但无需向客户端提供所有主机的完整列表。While there may be more than two broker hosts for your cluster, you do not need to provide a full list of all hosts to clients. 只需提供一两个就足够了。One or two is enough.

  2. 使用以下方法之一发现 HDInsight 群集上的 Kafka 的 Zookeeper 主机: Use one of the following methods to discover the Zookeeper hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.cn/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" `
        -Credential $creds
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $zookeeperHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($zookeeperHosts -join ":2181,") + ":2181"
    

    Important

    以下 Bash 示例假定 $CLUSTERNAME 包含 Kafka 群集的名称 。The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster. 还假定 jq 已安装。It also assumes that jq is installed. 出现提示时,输入群集登录帐户的密码。When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.cn/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2
    

    返回的值类似于下文:The value returned is similar to the following text:

     zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.chinacloudapp.cn:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.chinacloudapp.cn:2181
    

    Important

    虽然有两个以上的 Zookeeper 节点,但无需向客户端提供所有主机的完整列表。While there are more than two Zookeeper nodes, you do not need to provide a full list of all hosts to clients. 只需提供一两个就足够了。One or two is enough.

    保存该值,因为以后会用到。Save this value, as it is used later.

  3. 编辑项目根目录中的 dev.properties 文件。Edit the dev.properties file in the root of the project. 将 Kafka 群集的中转站和 Zookeeper 主机信息添加到此文件中的匹配行 。Add the Broker and Zookeeper hosts information for the Kafka cluster to the matching lines in this file. 下面的示例使用前面步骤中的示例值进行配置:The following example is configured using the sample values from the previous steps:

     kafka.zookeeper.hosts: zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.chinacloudapp.cn:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.chinacloudapp.cn:2181
     kafka.broker.hosts: wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.chinacloudapp.cn:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.chinacloudapp.cn:9092
     kafka.topic: stormtopic
    

    Important

    hdfs.url 条目为使用 Azure 存储帐户的群集而进行配置。The hdfs.url entry is configured for a cluster that uses an Azure Storage account. 若要将此拓扑与使用 Data Lake Storage 的 Storm 群集结合使用,请将此值从 wasb 更改为 adlTo use this topology with a Storm cluster that uses Data Lake Storage, change this value from wasb to adl.

  4. 保存 dev.properties 文件,然后使用以下命令将其上传到 Storm 群集: Save the dev.properties file and then use the following command to upload it to the Storm cluster:

    scp dev.properties USERNAME@BASENAME-ssh.azurehdinsight.cn:dev.properties
    

    将 USERNAME 替换为群集的 SSH 用户名。Replace USERNAME with the SSH user name for the cluster. 将 BASENAME 替换为创建群集时使用的基名称。Replace BASENAME with the base name you used when creating the cluster.

创建 Kafka 主题Create the Kafka topic

Kafka 将数据存储在主题中 。Kafka stores data into a topic. 启动 Storm 拓扑之前,必须创建主题。You must create the topic before starting the Storm topologies. 若要创建拓扑,请使用以下步骤:To create the topology, use the following steps:

  1. 使用以下命令通过 SSH 连接到 Kafka 群集 。Connect to the Kafka cluster through SSH by using the following command. sshuser 替换为创建群集时使用的 SSH 用户名。Replace sshuser with the SSH user name used when creating the cluster. kafkaclustername 替换为 Kafka 群集的名称:Replace kafkaclustername with the name of the Kafka cluster:

    ssh sshuser@kafkaclustername-ssh.azurehdinsight.cn
    

    出现提示时,请输入在创建群集时使用的密码。When prompted, enter the password you used when creating the clusters.

    有关信息,请参阅将 SSH 与 HDInsight 配合使用For information, see Use SSH with HDInsight.

  2. 若要创建 Kafka 主题,请使用以下命令。To create the Kafka topic, use the following command. $KAFKAZKHOSTS 替换为配置拓扑时使用的 Zookeeper 主机信息:Replace $KAFKAZKHOSTS with the Zookeeper host information you used when configuring the topology:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stormtopic --zookeeper $KAFKAZKHOSTS
    

    此命令可连接到 Kafka 群集的 Zookeeper,并创建名为 stormtopic 的新主题。This command connects to Zookeeper for the Kafka cluster and creates a new topic named stormtopic. Storm 拓扑会使用本主题。This topic is used by the Storm topologies.

启动写入器Start the writer

  1. 使用以下命令通过 SSH 连接到 Storm 群集 。Use the following to connect to the Storm cluster using SSH. sshuser 替换为创建群集时使用的 SSH 用户名。Replace sshuser with the SSH user name used when creating the cluster. stormclustername 替换为 Storm 群集的名称:Replace stormclustername with the name the Storm cluster:

    ssh sshuser@stormclustername-ssh.azurehdinsight.cn
    

    出现提示时,请输入在创建群集时使用的密码。When prompted, enter the password you used when creating the clusters.

    有关信息,请参阅将 SSH 与 HDInsight 配合使用For information, see Use SSH with HDInsight.

  2. 与 Storm 群集建立 SSH 连接后,使用以下命令启动写入器拓扑:From the SSH connection to the Storm cluster, use the following command to start the writer topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /writer.yaml --filter dev.properties
    

    此命令中使用的参数为:The parameters used with this command are:

    • org.apache.storm.flux.Flux:使用 Flux 配置和运行此拓扑。org.apache.storm.flux.Flux: Use Flux to configure and run this topology.

    • --remote:将拓扑提交到 Nimbus。--remote: Submit the topology to Nimbus. 拓扑分布在群集中的各个辅助角色节点上。The topology is distributed across the worker nodes in the cluster.

    • -R /writer.yaml:使用 writer.yaml 文件配置拓扑。-R /writer.yaml: Use the writer.yaml file to configure the topology. -R 指示此资源包含在 jar 文件中。-R indicates that this resource is included in the jar file. 该资源位于 jar 的根目录,因此, /writer.yaml 是它的路径。It's in the root of the jar, so /writer.yaml is the path to it.

    • --filter:使用 dev.properties 文件中的值填充 writer.yaml 拓扑中的条目。--filter: Populate entries in the writer.yaml topology using values in the dev.properties file. 例如,文件中 kafka.topic 条目的值用于替换拓扑定义中的 ${kafka.topic} 条目。For example, the value of the kafka.topic entry in the file is used to replace the ${kafka.topic} entry in the topology definition.

启动读取器Start the reader

  1. 与 Storm 群集建立 SSH 会话后,使用以下命令启动读取器拓扑:From the SSH session to the Storm cluster, use the following command to start the reader topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /reader.yaml --filter dev.properties
    
  2. 稍等片刻,然后使用以下命令来查看读取器拓扑创建的文件:Wait a minute and then use the following command to view the files created by the reader topology:

    hdfs dfs -ls /stormdata
    

    输出与以下文本类似:The output is similar to the following text:

     Found 173 items
     -rw-r--r--   1 storm supergroup       5137 2018-04-09 19:00 /stormdata/hdfs-bolt-4-0-1523300453088.txt
     -rw-r--r--   1 storm supergroup       5128 2018-04-09 19:00 /stormdata/hdfs-bolt-4-1-1523300453624.txt
     -rw-r--r--   1 storm supergroup       5131 2018-04-09 19:00 /stormdata/hdfs-bolt-4-10-1523300455170.txt
     ...
    
  3. 若要查看日志文件的内容,请使用以下命令。To view the contents of the file, use the following command. filename.txt 替换为文件的名称:Replace filename.txt with the name of a file:

    hdfs dfs -cat /stormdata/filename.txt
    

    以下文本是文件内容示例:The following text is an example of the file contents:

     four score and seven years ago
     snow white and the seven dwarfs
     i am at two with nature
     snow white and the seven dwarfs
     i am at two with nature
     four score and seven years ago
     an apple a day keeps the doctor away
    

停止拓扑Stop the topologies

与 Storm 群集建立 SSH 会话后,使用以下命令停止 Storm 拓扑:From an SSH session to the Storm cluster, use the following commands to stop the Storm topologies:

storm kill kafka-writer
storm kill kafka-reader

清理资源Clean up resources

若要清理本教程创建的资源,可以删除资源组。To clean up the resources created by this tutorial, you can delete the resource group. 删除资源组也会删除相关联的 HDInsight 群集,以及与资源组相关联的任何其他资源。Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

若要使用 Azure 门户删除资源组,请执行以下操作:To remove the resource group using the Azure portal:

  1. 在 Azure 门户中展开左侧的菜单,打开服务菜单,然后选择“资源组”以显示资源组的列表。 In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. 找到要删除的资源组,然后右键单击列表右侧的“更多”按钮 (...)。 Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. 选择“删除资源组”,然后进行确认。 Select Delete resource group, and then confirm.

后续步骤Next steps

本教程介绍了如何使用 Apache Storm 拓扑向 Apache Kafka on HDInsight 写入数据以及从中读取数据。In this tutorial, you learned how to use an Apache Storm topology to write to and read from Apache Kafka on HDInsight. 同时还介绍了如何将数据存储到 HDInsight 使用的 Apache Hadoop HDFS 兼容存储。You also learned how to store data to the Apache Hadoop HDFS compatible storage used by HDInsight.