在 HDInsight 上使用 Python 开发 Apache Storm 拓扑Develop Apache Storm topologies using Python on HDInsight

了解如何创建使用 Python 组件的 Apache Storm 拓扑。Learn how to create an Apache Storm topology that uses Python components. Apache Storm 支持多种语言,甚至可将多种语言的组件合并成一个拓扑。Apache Storm supports multiple languages, even allowing you to combine components from several languages in one topology. 借助 Flux 框架(通过 Storm 0.10.0 引入),可以轻松地创建使用 Python 组件的解决方案。The Flux framework (introduced with Storm 0.10.0) allows you to easily create solutions that use Python components.

重要

本文档中的信息已使用 Storm on HDInsight 3.6 进行测试。The information in this document was tested using Storm on HDInsight 3.6.

先决条件Prerequisites

Storm 多语言支持Storm multi-language support

Apache Storm 设计为与使用任何编程语言编写的组件配合使用。Apache Storm was designed to work with components written using any programming language. 组件必须了解如何使用 Storm 的 Thrift 定义The components must understand how to work with the Thrift definition for Storm. 对于 Python,会以 Apache Storm 项目的一部分提供模块,让用户可以轻松与 Storm 进行交互。For Python, a module is provided as part of the Apache Storm project that allows you to easily interface with Storm. 可以在 https://github.com/apache/storm/blob/master/storm-multilang/python/src/main/resources/resources/storm.py 上找到此模块。You can find this module at https://github.com/apache/storm/blob/master/storm-multilang/python/src/main/resources/resources/storm.py.

Storm 是在 Java 虚拟机 (JVM) 上运行的 Java 进程。Storm is a Java process that runs on the Java Virtual Machine (JVM). 使用其他语言编写的组件作为子进程执行。Components written in other languages are executed as subprocesses. Storm 使用通过 stdin/stdout 发送的 JSON 消息与这些子进程通信。The Storm communicates with these subprocesses using JSON messages sent over stdin/stdout. 有关组件间通信的更多详细信息,请参阅 Multi-lang Protocol(多语言协议)文档。More details on communication between components can be found in the Multi-lang Protocol documentation.

使用 Flux 框架的 PythonPython with the Flux framework

借助 Flux 框架,可独立于组件定义 Storm 拓扑。The Flux framework allows you to define Storm topologies separately from the components. Flux 框架使用 YAML 定义 Storm 拓扑。The Flux framework uses YAML to define the Storm topology. 下面的文本举例说明如何在 YAML 文档中引用 Python 组件:The following text is an example of how to reference a Python component in the YAML document:

# Spout definitions
spouts:
  - id: "sentence-spout"
    className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
    constructorArgs:
      # Command line
      - ["python", "sentencespout.py"]
      # Output field(s)
      - ["sentence"]
    # parallelism hint
    parallelism: 1

FluxShellSpout 用于启动实现 spout 的 sentencespout.py 脚本。The class FluxShellSpout is used to start the sentencespout.py script that implements the spout.

Flux 需要 Python 脚本位于包含拓扑的 jar 文件内的 /resources 目录中。Flux expects the Python scripts to be in the /resources directory inside the jar file that contains the topology. 因此,此示例将 Python 脚本存储在 /multilang/resources 目录中。So this example stores the Python scripts in the /multilang/resources directory. pom.xml 使用以下 XML 包含此文件:The pom.xml includes this file using the following XML:

<!-- include the Python components -->
<resource>
    <directory>${basedir}/multilang</directory>
    <filtering>false</filtering>
</resource>

如前所述,存在实现 Storm 的 Thrift 定义的 storm.py 文件。As mentioned earlier, there is a storm.py file that implements the Thrift definition for Storm. Flux 框架在生成项目时自动包含 storm.py,无需额外执行操作。The Flux framework includes storm.py automatically when the project is built, so you don't have to worry about including it.

生成项目Build the project

  1. https://github.com/Azure-Samples/hdinsight-python-storm-wordcount 下载项目。Download the project from https://github.com/Azure-Samples/hdinsight-python-storm-wordcount.

  2. 打开命令提示符并导航到项目根:hdinsight-python-storm-wordcount-masterOpen a command prompt and navigate to the project root: hdinsight-python-storm-wordcount-master. 输入以下命令:Enter the following command:

    mvn clean compile package
    

此命令可创建 target/WordCount-1.0-SNAPSHOT.jar 文件,其中包含已编译的拓扑。This command creates a target/WordCount-1.0-SNAPSHOT.jar file that contains the compiled topology.

在 HDInsight 上运行 Storm 拓扑Run the Storm topology on HDInsight

  1. 使用 ssh 命令WordCount-1.0-SNAPSHOT.jar 文件复制到 HDInsight 群集上的 Storm。Use ssh command to copy the WordCount-1.0-SNAPSHOT.jar file to your Storm on HDInsight cluster. 编辑以下命令(将 CLUSTERNAME 替换为群集的名称),然后输入该命令:Edit the command below by replacing CLUSTERNAME with the name of your cluster, and then enter the command:

    scp target/WordCount-1.0-SNAPSHOT.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.cn:
    
  2. 文件上传后,使用 SSH 连接到群集:Once the file has been uploaded, connect to the cluster using SSH:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
    
  3. 在 SSH 会话中,使用以下命令在群集上启动拓扑:From the SSH session, use the following command to start the topology on the cluster:

    storm jar WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux -r -R /topology.yaml
    

    启动后,Storm 拓扑会一直运行,直到被停止。Once started, a Storm topology runs until stopped.

  4. 使用 Storm UI 查看群集上的拓扑。Use the Storm UI to view the topology on the cluster. Storm UI 位于 https://CLUSTERNAME.azurehdinsight.cn/stormuiThe Storm UI is located at https://CLUSTERNAME.azurehdinsight.cn/stormui. CLUSTERNAME 替换为群集名称。Replace CLUSTERNAME with your cluster name.

  5. 停止 Storm 拓扑。Stop the Storm topology. 使用以下命令在群集上停止拓扑:Use the following command to stop the topology on the cluster:

    storm kill wordcount
    

    或者,可以使用 Storm UI。Alternatively, you can use the Storm UI. 在拓扑的“拓扑操作”下,选择“终止” 。Under Topology actions for the topology, select Kill.

在本地运行拓扑Run the topology locally

若要在本地运行拓扑,请使用以下命令:To run the topology locally, use the following command:

storm jar WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux -l -R /topology.yaml

备注

此命令要求提供本地 Storm 开发环境。This command requires a local Storm development environment. 有关详细信息,请参阅设置开发环境For more information, see Setting up a development environment

拓扑启动后,它会向本地控制台发出类似如下文本的信息:Once the topology starts, it emits information to the local console similar to the following text:

24302 [Thread-25-sentence-spout-executor[4 4]] INFO  o.a.s.s.ShellSpout - ShellLog pid:2436, name:sentence-spout Emiting the cow jumped over the moon
24302 [Thread-30] INFO  o.a.s.t.ShellBolt - ShellLog pid:2438, name:splitter-bolt Emitting the
24302 [Thread-28] INFO  o.a.s.t.ShellBolt - ShellLog pid:2437, name:counter-bolt Emitting years:160
24302 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {word=the, count=599}
24303 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {word=seven, count=302}
24303 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {word=dwarfs, count=143}
24303 [Thread-25-sentence-spout-executor[4 4]] INFO  o.a.s.s.ShellSpout - ShellLog pid:2436, name:sentence-spout Emiting the cow jumped over the moon
24303 [Thread-30] INFO  o.a.s.t.ShellBolt - ShellLog pid:2438, name:splitter-bolt Emitting cow
24303 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {word=four, count=160}

若要停止拓扑,请使用 Ctrl + CTo stop the topology, use Ctrl + C.

后续步骤Next steps

请参阅以下文档,了解配合使用 Python 和 HDInsight 的其他方式:如何在 Apache Pig 和 Apache Hive 中使用 Python 用户定义函数 (UDF)See the following documents for other ways to use Python with HDInsight: How to use Python User Defined Functions (UDF) in Apache Pig and Apache Hive.