以 Java 语言创建 Apache Storm 拓扑Create an Apache Storm topology in Java

了解如何为 Apache Storm 创建基于 Java 的拓扑。Learn how to create a Java-based topology for Apache Storm. 在此处,我们将创建一个实现单词计数应用程序的 Storm 拓扑。Here, you create a Storm topology that implements a word-count application. 将使用 Apache Maven 构建并打包项目。You use Apache Maven to build and package the project. 然后,了解如何使用 Apache Storm Flux 框架定义拓扑。Then, you learn how to define the topology using the Apache Storm Flux framework.

完成本文档中的步骤之后,便可以将拓扑部署到 Apache Storm on HDInsight。After completing the steps in this document, you can deploy the topology to Apache Storm on HDInsight.

Note

https://github.com/Azure-Samples/hdinsight-java-storm-wordcount 上提供了本文档中创建的 Storm 拓扑示例的完整版本。A completed version of the Storm topology examples created in this document is available at https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.

先决条件Prerequisites

测试环境Test environment

本文使用的环境是一台运行 Windows 10 的计算机。The environment used for this article was a computer running Windows 10. 命令在命令提示符下执行,各种文件使用记事本进行编辑。The commands were executed in a command prompt, and the various files were edited with Notepad.

在命令提示符下,输入以下命令以创建工作环境:From a command prompt, enter the commands below to create a working environment:

mkdir C:\HDI
cd C:\HDI

创建 Maven 项目Create a Maven project

输入以下命令,创建名为 WordCount 的 Maven 项目:Enter the following command to create a Maven project named WordCount:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount
mkdir resources

该命令会在当前位置创建名为 WordCount 的目录,其中包含基本 Maven 项目。This command creates a directory named WordCount at the current location, which contains a basic Maven project. 第二条命令将现有工作目录更改为 WordCountThe second command changes the present working directory to WordCount. 第三条命令创建稍后要使用的新目录 resourcesThe third command creates a new directory, resources, which will be used later. WordCount 目录包含以下项:The WordCount directory contains the following items:

  • pom.xml:包含 Maven 项目的设置。pom.xml: Contains settings for the Maven project.
  • src\main\java\com\microsoft\example:包含应用程序代码。src\main\java\com\microsoft\example: Contains your application code.
  • src\test\java\com\microsoft\example:包含应用程序的测试。src\test\java\com\microsoft\example: Contains tests for your application.

删除生成的示例代码Remove the generated example code

输入以下命令,删除生成的测试和应用程序文件 AppTest.javaApp.javaDelete the generated test and application files AppTest.java, and App.java by entering the commands below:

DEL src\main\java\com\microsoft\example\App.java
DEL src\test\java\com\microsoft\example\AppTest.java

添加 Maven 存储库Add Maven repositories

由于 HDInsight 基于 Hortonworks Data Platform (HDP),因此我们建议使用 Hortonworks 存储库来下载 Apache Storm 项目的依赖项。HDInsight is based on the Hortonworks Data Platform (HDP), so we recommend using the Hortonworks repository to download dependencies for your Apache Storm projects.

输入以下命令打开 pom.xmlOpen pom.xml by entering the command below:

notepad pom.xml

然后,在 <url> https://maven.apache.org</url> 行的后面添加以下 XML:Then add the following XML after the <url>https://maven.apache.org</url> line:

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPReleases</id>
        <name>HDP Releases</name>
        <url>https://repo.hortonworks.com/content/repositories/releases/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPJetty</id>
        <name>Hadoop Jetty</name>
        <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
        <layout>default</layout>
    </repository>
</repositories>

添加属性Add properties

Maven 允许定义项目级的值,称为属性。Maven allows you to define project-level values called properties. pom.xml 中的 </repositories> 行后面添加以下文本:In pom.xml, add the following text after the </repositories> line:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--
    This is a version of Storm from the Hortonworks repository that is compatible with HDInsight 3.6.
    -->
    <storm.version>1.1.0.2.6.1.9-1</storm.version>
</properties>

现在,可以在 pom.xml 的其他部分中使用此值。You can now use this value in other sections of the pom.xml. 例如,在指定 Storm 组件的版本时,可以使用 ${storm.version} 而无需将值硬编码。For example, when specifying the version of Storm components, you can use ${storm.version} instead of hard coding a value.

添加依赖项Add dependencies

添加 Storm 组件的依赖项。Add a dependency for Storm components. pom.xml<dependencies> 节中添加以下文本:In pom.xml, add the following text in the <dependencies> section:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

在编译时,Maven 会使用此信息在 Maven 存储库中查找 storm-coreAt compile time, Maven uses this information to look up storm-core in the Maven repository. 它会先查找本地计算机上的存储库。It first looks in the repository on your local computer. 如果文件不存在,Maven 会从公共 Maven 存储库下载这些文件,并将其存储在本地存储库中。If the files aren't there, Maven downloads them from the public Maven repository and stores them in the local repository.

Note

请注意该部分中的 <scope>provided</scope> 行。Notice the <scope>provided</scope> line in this section. 此设置会告诉 Maven 从创建的任何 JAR 文件中排除 storm-core,因为系统会提供它。This setting tells Maven to exclude storm-core from any JAR files that are created, because it is provided by the system.

生成配置Build configuration

Maven 插件可用于自定义项目的生成阶段。Maven plug-ins allow you to customize the build stages of the project. 例如,如何编译项目或者如何将其打包到 JAR 文件中。For example, how the project is compiled or how to package it into a JAR file. pom.xml 中,紧靠在 </project> 行的上面添加以下文本:In pom.xml, add the following text directly above the </project> line.

<build>
    <plugins>
    </plugins>
    <resources>
    </resources>
</build>

此节用于添加插件、资源和其他生成配置选项。This section is used to add plug-ins, resources, and other build configuration options. 有关 pom.xml 文件的完整参考,请参阅 https://maven.apache.org/pom.htmlFor a full reference of the pom.xml file, see https://maven.apache.org/pom.html.

添加插件Add plug-ins

  • Exec Maven 插件Exec Maven Plugin

    对于以 Java 语言实现的 Apache Storm 拓扑,Exec Maven 插件十分有用,因为它可让你轻松地在开发环境本地运行拓扑。For Apache Storm topologies implemented in Java, the Exec Maven Plugin is useful because it allows you to easily run the topology locally in your development environment. pom.xml 文件的 <plugins> 部分中添加以下内容,以包括 Exec Maven 插件:Add the following to the <plugins> section of the pom.xml file to include the Exec Maven plugin:

    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
            <execution>
            <goals>
                <goal>exec</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>${storm.topology}</mainClass>
            <cleanupDaemonThreads>false</cleanupDaemonThreads> 
        </configuration>
    </plugin>
    
  • Apache Maven Compiler 插件Apache Maven Compiler Plugin

    另一个有用的插件是用于更改编译选项的 Apache Maven Compiler 插件Another useful plug-in is the Apache Maven Compiler Plugin, which is used to change compilation options. 更改 Maven 用作应用程序源和目标的 Java 版本。Change the Java version that Maven uses for the source and target for your application.

    • 对于 HDInsight 3.4 或更早的版本,请将源和目标 Java 版本设置为 1.7For HDInsight 3.4 or earlier, set the source and target Java version to 1.7.

    • 对于 HDInsight 3.5,请将源和目标 Java 版本设置为 1.8For HDInsight 3.5, set the source and target Java version to 1.8.

      pom.xml 文件的 <plugins> 部分添加以下文本,以包括 Apache Maven Compiler 插件。Add the following text in the <plugins> section of the pom.xml file to include the Apache Maven Compiler plugin. 此示例指定 1.8,因此目标 HDInsight 版本为 3.5。This example specifies 1.8, so the target HDInsight version is 3.5.

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
        <source>1.8</source>
        <target>1.8</target>
        </configuration>
      </plugin>
      

配置资源Configure resources

使用 resources 节可以包含非代码资源,例如拓扑中组件所需的配置文件。The resources section allows you to include non-code resources such as configuration files needed by components in the topology. 本示例在 pom.xml 文件的 <resources> 节中添加以下文本。For this example, add the following text in the <resources> section of the pom.xml file.

<resource>
    <directory>${basedir}/resources</directory>
    <filtering>false</filtering>
    <includes>
        <include>log4j2.xml</include>
    </includes>
</resource>

本示例会将项目根目录 (${basedir}) 中的 resources 目录添加为包含资源的位置,并包含名为 log4j2.xml 的文件。This example adds the resources directory in the root of the project (${basedir}) as a location that contains resources, and includes the file named log4j2.xml. 此文件用于配置拓扑所要记录的信息。This file is used to configure what information is logged by the topology.

创建拓扑Create the topology

基于 Java 的 Apache Storm 拓扑包含必须编写(或引用)为依赖项的三个组件。A Java-based Apache Storm topology consists of three components that you must author (or reference) as a dependency.

  • Spout:读取外部源中的数据,并发出进入拓扑的数据流。Spouts: Reads data from external sources and emits streams of data into the topology.

  • Bolt:对 Spout 或其他 Bolt 所发出的数据流执行处理,并发出一个或多个数据流。Bolts: Performs processing on streams emitted by spouts or other bolts, and emits one or more streams.

  • 拓扑:定义如何排列 Spout 和 Bolt,并提供拓扑的入口点。Topology: Defines how the spouts and bolts are arranged, and provides the entry point for the topology.

创建 SpoutCreate the spout

为了降低设置外部数据源的要求,以下 Spout 只会发出随机句子。To reduce requirements for setting up external data sources, the following spout simply emits random sentences. 它是 Storm-Starter 示例随附的 Spout 的修改版本。It is a modified version of a spout that is provided with the Storm-Starter examples. 虽然此拓扑只使用一个 Spout,但其他拓扑可能存在将数据从不同源送入拓扑的多个 Spout。Although this topology uses only one spout, others may have several that feed data from different sources into the topology.

输入以下命令,以创建并打开新文件 RandomSentenceSpout.javaEnter the command below to create and open a new file RandomSentenceSpout.java:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

将以下 Java 代码复制并粘贴到新文件中。Then copy and paste the java code below into the new file. 然后关闭该文件。Then close the file.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
  //Collector used to emit output
  SpoutOutputCollector _collector;
  //Used to generate a random number
  Random _rand;

  //Open is called when an instance of the class is created
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  //Set the instance collector to the one passed in
    _collector = collector;
    //For randomness
    _rand = new Random();
  }

  //Emit data to the stream
  @Override
  public void nextTuple() {
  //Sleep for a bit
    Utils.sleep(100);
    //The sentences that are randomly emitted
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //Randomly pick a sentence
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //Emit the sentence
    _collector.emit(new Values(sentence));
  }

  //Ack is not implemented since this is a basic example
  @Override
  public void ack(Object id) {
  }

  //Fail is not implemented since this is a basic example
  @Override
  public void fail(Object id) {
  }

  //Declare the output fields. In this case, an sentence
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
  }
}

Note

有关从外部数据源读取的 Spout 的示例,请参阅以下示例之一:For an example of a spout that reads from an external data source, see one of the following examples:

创建 BoltCreate the bolts

Bolt 用于处理数据。Bolts handle the data processing. Bolt 可以执行任何操作,例如,计算、保存,或者与外部组件通信。Bolts can do anything, for example, computation, persistence, or talking to external components. 此拓扑使用两个 Bolt:This topology uses two bolts:

  • SplitSentence:将 RandomSentenceSpout 发出的句子分割成不同的单词。SplitSentence: Splits the sentences emitted by RandomSentenceSpout into individual words.

  • WordCount:统计每个单词的出现次数。WordCount: Counts how many times each word has occurred.

SplitSentenceSplitSentence

输入以下命令,以创建并打开新文件 SplitSentence.javaEnter the command below to create and open a new file SplitSentence.java:

notepad src\main\java\com\microsoft\example\SplitSentence.java

将以下 Java 代码复制并粘贴到新文件中。Then copy and paste the java code below into the new file. 然后关闭该文件。Then close the file.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class SplitSentence extends BaseBasicBolt {

  //Execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //Get the sentence content from the tuple
    String sentence = tuple.getString(0);
    //An iterator to get each word
    BreakIterator boundary=BreakIterator.getWordInstance();
    //Give the iterator the sentence
    boundary.setText(sentence);
    //Find the beginning first word
    int start=boundary.first();
    //Iterate over each word and emit it to the output stream
    for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {
      //get the word
      String word=sentence.substring(start,end);
      //If a word is whitespace characters, replace it with empty
      word=word.replaceAll("\\s+","");
      //if it's an actual word, emit it
      if (!word.equals("")) {
        collector.emit(new Values(word));
      }
    }
  }

  //Declare that emitted tuples contain a word field
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

WordCountWordCount

输入以下命令,以创建并打开新文件 WordCount.javaEnter the command below to create and open a new file WordCount.java:

notepad src\main\java\com\microsoft\example\WordCount.java

将以下 Java 代码复制并粘贴到新文件中。Then copy and paste the java code below into the new file. 然后关闭该文件。Then close the file.

package com.microsoft.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;

import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;

// For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
  //Create logger for this class
  private static final Logger logger = LogManager.getLogger(WordCount.class);
  //For holding words and counts
  Map<String, Integer> counts = new HashMap<String, Integer>();
  //How often to emit a count of words
  private Integer emitFrequency;

  // Default constructor
  public WordCount() {
      emitFrequency=5; // Default to 60 seconds
  }

  // Constructor that sets emit frequency
  public WordCount(Integer frequency) {
      emitFrequency=frequency;
  }

  //Configure frequency of tick tuples for this bolt
  //This delivers a 'tick' tuple on a specific interval,
  //which is used to trigger certain actions
  @Override
  public Map<String, Object> getComponentConfiguration() {
      Config conf = new Config();
      conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
      return conf;
  }

  //execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //If it's a tick tuple, emit all words and counts
    if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
      for(String word : counts.keySet()) {
        Integer count = counts.get(word);
        collector.emit(new Values(word, count));
        logger.info("Emitting a count of " + count + " for word " + word);
      }
    } else {
      //Get the word contents from the tuple
      String word = tuple.getString(0);
      //Have we counted any already?
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      //Increment the count and store it
      count++;
      counts.put(word, count);
    }
  }

  //Declare that this emits a tuple containing two fields; word and count
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

定义拓扑Define the topology

拓扑将 Spout 和 Bolt 一起绑定到图形,该图形定义了组件之间的数据流动方式。The topology ties the spouts and bolts together into a graph, which defines how data flows between the components. 它还提供 Storm 在群集内创建组件的实例时使用的并行度提示。It also provides parallelism hints that Storm uses when creating instances of the components within the cluster.

下图是此拓扑的组件的基本原理图。The following image is a basic diagram of the graph of components for this topology.

显示 Spout 和 Bolt 排列方式的示意图

若要实现该拓扑,请输入以下命令,以创建并打开新文件 WordCountTopology.javaTo implement the topology, enter the command below to create and open a new file WordCountTopology.java:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

将以下 Java 代码复制并粘贴到新文件中。Then copy and paste the java code below into the new file. 然后关闭该文件。Then close the file.

package com.microsoft.example;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

  //Entry point for the topology
  public static void main(String[] args) throws Exception {
  //Used to build the topology
    TopologyBuilder builder = new TopologyBuilder();
    //Add the spout, with a name of 'spout'
    //and parallelism hint of 5 executors
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Add the SplitSentence bolt, with a name of 'split'
    //and parallelism hint of 8 executors
    //shufflegrouping subscribes to the spout, and equally distributes
    //tuples (sentences) across instances of the SplitSentence bolt
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    //Add the counter, with a name of 'count'
    //and parallelism hint of 12 executors
    //fieldsgrouping subscribes to the split bolt, and
    //ensures that the same word is sent to the same instance (group by field 'word')
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //new configuration
    Config conf = new Config();
    //Set to false to disable debug information when
    // running in production on a cluster
    conf.setDebug(false);

    //If there are arguments, we are running on a cluster
    if (args != null && args.length > 0) {
      //parallelism hint to set the number of workers
      conf.setNumWorkers(3);
      //submit the topology
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    //Otherwise, we are running locally
    else {
      //Cap the maximum number of executors that can be spawned
      //for a component to 3
      conf.setMaxTaskParallelism(3);
      //LocalCluster is used to run locally
      LocalCluster cluster = new LocalCluster();
      //submit the topology
      cluster.submitTopology("word-count", conf, builder.createTopology());
      //sleep
      Thread.sleep(10000);
      //shut down the cluster
      cluster.shutdown();
    }
  }
}

配置日志记录Configure logging

Storm 使用 Apache Log4j 2 来记录信息。Storm uses Apache Log4j 2 to log information. 如果未配置日志记录,拓扑将发出诊断信息。If you do not configure logging, the topology emits diagnostic information. 若要控制所要记录的内容,请输入以下命令,在 resources 目录中创建名为 log4j2.xml 的文件:To control what is logged, create a file named log4j2.xml in the resources directory by entering the command below:

notepad resources\log4j2.xml

将以下 XML 文本复制并粘贴到新文件中。Then copy and paste the XML text below into the new file. 然后关闭该文件。Then close the file.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
    <Console name="STDOUT" target="SYSTEM_OUT">
        <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
    </Console>
</Appenders>
<Loggers>
    <Logger name="com.microsoft.example" level="trace" additivity="false">
        <AppenderRef ref="STDOUT"/>
    </Logger>
    <Root level="error">
        <Appender-Ref ref="STDOUT"/>
    </Root>
</Loggers>
</Configuration>

此 XML 为 com.microsoft.example 类(其中包含本示例拓扑中的组件)配置一个新记录器。This XML configures a new logger for the com.microsoft.example class, which includes the components in this example topology. 此记录器的级别设置为“跟踪”,可以捕获此拓扑中的组件发出的任何日志记录信息。The level is set to trace for this logger, which captures any logging information emitted by components in this topology.

<Root level="error"> 部分将日志记录的根级别(不在 com.microsoft.example 中的所有内容)配置为只记录错误信息。The <Root level="error"> section configures the root level of logging (everything not in com.microsoft.example) to only log error information.

有关为 Log4j 2 配置日志记录的详细信息,请参阅 https://logging.apache.org/log4j/2.x/manual/configuration.htmlFor more information on configuring logging for Log4j 2, see https://logging.apache.org/log4j/2.x/manual/configuration.html.

Note

Storm 0.10.0 版及更高版本使用 Log4j 2.x。Storm version 0.10.0 and higher use Log4j 2.x. 早期版本的 Storm 使用 Log4j 1.x(为日志配置使用的格式不同)。Older versions of storm used Log4j 1.x, which used a different format for log configuration. 有关旧配置的信息,请参阅 https://wiki.apache.org/logging-log4j/Log4jXmlFormatFor information on the older configuration, see https://wiki.apache.org/logging-log4j/Log4jXmlFormat.

在本地测试拓扑Test the topology locally

保存文件之后,请使用以下命令在本地测试拓扑。After you save the files, use the following command to test the topology locally.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

运行该命令时,拓扑显示启动信息。As it runs, the topology displays startup information. 以下文本是单词计数输出的示例:The following text is an example of the word count output:

17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word snow

此示例日志指示单词“and”已发出了 113 次。This example log indicates that the word 'and' has been emitted 113 times. 只要拓扑运行,计数就会持续增加,因为 Spout 会连续发出相同的句子。The count continues to go up as long as the topology runs because the spout continuously emits the same sentences.

每两次发出单词和句子的间隔为 5 秒。There is a 5-second interval between emission of words and counts. WordCount 组件配置为仅当 tick 元组到达时才发出信息。The WordCount component is configured to only emit information when a tick tuple arrives. 它要求仅每五秒钟传送一次 tick 元组。It requests that tick tuples are only delivered every five seconds.

将拓扑转换为 FluxConvert the topology to Flux

Flux 是 Storm 0.10.0 及更高版本随附的一个新框架,可以将配置和实现分离开来。Flux is a new framework available with Storm 0.10.0 and higher, which allows you to separate configuration from implementation. 组件仍然是以 Java 语言定义的,但拓扑是使用 YAML 文件定义的。Your components are still defined in Java, but the topology is defined using a YAML file. 可以随项目一起打包默认的拓扑定义,也可以在提交拓扑时使用独立的文件。You can package a default topology definition with your project, or use a standalone file when submitting the topology. 将拓扑提交到 Storm 时,可以使用环境变量或配置文件来填充 YAML 拓扑定义中的值。When submitting the topology to Storm, you can use environment variables or configuration files to populate values in the YAML topology definition.

YAML 文件定义了要用于拓扑的组件以及它们之间的数据流。The YAML file defines the components to use for the topology and the data flow between them. 可以包括一个 YAML 文件(作为 jar 文件的一部分),也可以使用外部 YAML 文件。You can include a YAML file as part of the jar file or you can use an external YAML file.

有关 Flux 的详细信息,请参阅 Flux 框架 (https://storm.apache.org/releases/1.0.6/flux.html)For more information on Flux, see Flux framework (https://storm.apache.org/releases/1.0.6/flux.html).

Warning

由于 Storm 1.0.1 的 bug (https://issues.apache.org/jira/browse/STORM-2055),可能需要安装 Storm 开发环境,在本地运行 Flux 拓扑。Due to a bug (https://issues.apache.org/jira/browse/STORM-2055) with Storm 1.0.1, you may need to install a Storm development environment to run Flux topologies locally.

  1. 以前,WordCountTopology.java 会定义拓扑,但使用 Flux 时无需这样做。Previously, WordCountTopology.java defined the topology, but isn't needed with Flux. 使用以下命令删除该文件:Delete the file with the following command:

    DEL src\main\java\com\microsoft\example\WordCountTopology.java
    
  2. 输入以下命令,以创建并打开新文件 topology.yamlEnter the command below to create and open a new file topology.yaml:

    notepad resources\topology.yaml
    

    将以下文本复制并粘贴到新文件中。Then copy and paste the text below into the new file. 然后关闭该文件。Then close the file.

    name: "wordcount"       # friendly name for the topology
    
    config:                 # Topology configuration
      topology.workers: 1     # Hint for the number of workers to create
    
    spouts:                 # Spout definitions
    - id: "sentence-spout"
      className: "com.microsoft.example.RandomSentenceSpout"
      parallelism: 1      # parallelism hint
    
    bolts:                  # Bolt definitions
    - id: "splitter-bolt"
      className: "com.microsoft.example.SplitSentence"
      parallelism: 1
    
    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 10
      parallelism: 1
    
    streams:                # Stream definitions
    - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
      from: "sentence-spout"       # The stream emitter
      to: "splitter-bolt"          # The stream consumer
      grouping:                    # Grouping type
        type: SHUFFLE
    
    - name: "Splitter -> Counter"
      from: "splitter-bolt"
      to: "counter-bolt"
      grouping:
        type: FIELDS
        args: ["word"]           # field(s) to group on
    
  3. 输入以下命令打开 pom.xml,并做出下面所述的修改:Enter the command below to open pom.xml to make the described revisions below:

    notepad pom.xml
    
    • <dependencies> 节中添加以下新依赖关系:Add the following new dependency in the <dependencies> section:

      <!-- Add a dependency on the Flux framework -->
      <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>flux-core</artifactId>
          <version>${storm.version}</version>
      </dependency>
      
    • 将以下插件添加到 <plugins> 节。Add the following plugin to the <plugins> section. 此插件处理项目包(jar 文件)的创建,并在创建包时应用一些特定于 Flux 的转换。This plugin handles the creation of a package (jar file) for the project, and applies some transformations specific to Flux when creating the package.

      <!-- build an uber jar -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.2.1</version>
          <configuration>
              <transformers>
                  <!-- Keep us from getting a "can't overwrite file error" -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <!-- We're using Flux, so refer to it as main -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass>org.apache.storm.flux.Flux</mainClass>
                  </transformer>
              </transformers>
              <!-- Keep us from getting a bad signature error -->
              <filters>
                  <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                  </filter>
              </filters>
          </configuration>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>
      
    • exec-maven-plugin <configuration> 节中,将 <mainClass> 的值从 ${storm.topology} 更改为 org.apache.storm.flux.FluxIn the exec-maven-plugin <configuration> section, change the value for <mainClass> from ${storm.topology} to org.apache.storm.flux.Flux. 在开发环境中本地运行拓扑时,Flux 可以使用此设置处理拓扑运行。This setting allows Flux to handle running the topology locally in development.

    • 将以下内容添加到 <resources> 节中的 <includes>In the <resources> section, add the following to <includes>. 此 XML 包括了将拓扑定义为项目一部分的 YAML 文件。This XML includes the YAML file that defines the topology as part of the project.

      <include>topology.yaml</include>
      

在本地测试 Flux 拓扑Test the flux topology locally

  1. 输入以下命令,以使用 Maven 编译并执行 Flux 拓扑:Enter the following command to compile and execute the Flux topology using Maven:

    mvn compile exec:java -Dexec.args="--local -R /topology.yaml"
    

    Warning

    如果拓扑使用 Storm 1.0.1 位,此命令会失败。If your topology uses Storm 1.0.1 bits, this command fails. 此失败是由 https://issues.apache.org/jira/browse/STORM-2055 导致的。This failure is caused by https://issues.apache.org/jira/browse/STORM-2055. 相反,在开发环境中安装 Storm,并按照以下步骤操作:Instead, install Storm in your development environment and use the following steps:

    如果已在开发环境中安装 Storm,则可以改用以下命令:If you have installed Storm in your development environment, you can use the following commands instead:

    mvn compile package
    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml
    

    --local 参数在开发环境中以本地模式运行拓扑。The --local parameter runs the topology in local mode on your development environment. -R /topology.yaml 参数使用 jar 文件中的 topology.yaml 文件资源来定义拓扑。The -R /topology.yaml parameter uses the topology.yaml file resource from the jar file to define the topology.

    运行该命令时,拓扑显示启动信息。As it runs, the topology displays startup information. 以下文本是输出的示例:The following text is an example of the output:

     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
     17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
    

    不同批次的日志记录信息之间存在 10 秒的延迟。There is a 10-second delay between batches of logged information.

  2. 基于项目创建新的拓扑 yaml。Create a new topology yaml from the project.

    a.a. 输入以下命令打开 topology.xmlEnter the command below to open topology.xml:

    notepad resources\topology.yaml
    

    b.b. 找到以下节,将 10 的值更改为 5Find the following section and change the value of 10 to 5. 此修改会将发出单词计数批的间隔时间从 10 秒更改为 5 秒。This modification changes the interval between emitting batches of word counts from 10 seconds to 5.

    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 5
      parallelism: 1  
    

    c.c. 将文件另存为 newtopology.yamlSave file as newtopology.yaml.

  3. 若要运行拓扑,请输入以下命令:To run the topology, enter the following command:

    mvn exec:java -Dexec.args="--local resources/newtopology.yaml"
    

    或者,如果开发环境中有 Storm,则执行以下操作:Or, if you have Storm on your development environment:

    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml
    

    此命令使用 newtopology.yaml 作为拓扑定义。This command uses the newtopology.yaml as the topology definition. 由于没有包含 compile 参数,Maven 使用前面步骤中生成的项目的版本。Since we didn't include the compile parameter, Maven uses the version of the project built in previous steps.

    启动拓扑后,你将发现,发出批的间隔时间已更改,会反映 newtopology.yaml 中的值。Once the topology starts, you should notice that the time between emitted batches has changed to reflect the value in newtopology.yaml. 因此可以看到,无需重新编译拓扑即可通过 YAML 文件更改配置。So you can see that you can change your configuration through a YAML file without having to recompile the topology.

有关 Flux 框架的上述功能和其他功能的详细信息,请参阅 Flux https://storm.apache.org/releases/current/flux.html)For more information on these and other features of the Flux framework, see Flux (https://storm.apache.org/releases/current/flux.html).

TridentTrident

Trident 是 Storm 提供的高级抽象。Trident is a high-level abstraction that is provided by Storm. 它支持有状态处理。It supports stateful processing. Trident 的主要优点在于,它可以保证进入拓扑的每个消息只会处理一次。The primary advantage of Trident is that it can guarantee that every message that enters the topology is processed only once. 如果不使用 Trident,则拓扑只能保证至少将消息处理一次。Without using Trident, your topology can only guarantee that messages are processed at least once. 两者还有其他方面的差异,例如,可以使用内置组件,而无需创建 Bolt。There are also other differences, such as built-in components that can be used instead of creating bolts. 事实上,可以使用低泛型组件(例如筛选、投影和函数)来取代 Bolt。In fact, bolts are replaced by less-generic components, such as filters, projections, and functions.

可以使用 Maven 项目来创建 Trident 应用程序。Trident applications can be created by using Maven projects. 使用本文前面所述的相同基本步骤 - 只有代码不同。You use the same basic steps as presented earlier in this article—only the code is different. Trident(目前)还不能与 Flux 框架配合使用。Trident also cannot (currently) be used with the Flux framework.

有关 Trident 的详细信息,请参阅 Trident API 概述For more information about Trident, see the Trident API Overview.

后续步骤Next Steps

已学习如何使用 Java 创建 Apache Storm 拓扑。You have learned how to create an Apache Storm topology by using Java. 接下来,请学习如何:Now learn how to:

如需更多 Apache Storm 拓扑示例,请访问 Apache Storm on HDInsight 示例拓扑You can find more example Apache Storm topologies by visiting Example topologies for Apache Storm on HDInsight.