使用针对 Visual Studio 的 Data Lake 工具开发 Apache Storm 的 C# 拓扑Develop C# topologies for Apache Storm by using the Data Lake tools for Visual Studio

了解如何使用针对 Visual Studio 的 Azure Data Lake (Apache Hadoop) 工具创建 C# Apache Storm 拓扑。Learn how to create a C# Apache Storm topology by using the Azure Data Lake (Apache Hadoop) tools for Visual Studio. 本文档逐步说明在 Visual Studio 中创建 Storm 项目、在本地测试该项目,然后将它部署到 Apache Storm on Azure HDInsight 群集的过程。This document walks through the process of creating a Storm project in Visual Studio, testing it locally, and deploying it to an Apache Storm on Azure HDInsight cluster.

同时还说明了如何创建使用 C# 和 Java 组件的混合拓扑。You also learn how to create hybrid topologies that use C# and Java components.

Note

虽然本文档中的步骤依赖于带 Visual Studio 的 Windows 开发环境,但是也可将编译的项目提交到基于 Linux 或 Windows 的 HDInsight 群集。While the steps in this document rely on a Windows development environment with Visual Studio, the compiled project can be submitted to either a Linux or Windows-based HDInsight cluster. 仅在 2016 年 10 月 28 日以后创建的基于 Linux 的群集支持 SCP.NET 拓扑。Only Linux-based clusters created after October 28, 2016, support SCP.NET topologies.

若要将 C# 拓扑与基于 Linux 的群集配合使用,必须将项目使用的 Microsoft.SCP.Net.SDK NuGet 包更新到 0.10.0.6 或更高版本。To use a C# topology with a Linux-based cluster, you must update the Microsoft.SCP.Net.SDK NuGet package used by your project to version 0.10.0.6 or later. 包的版本还必须与 HDInsight 上安装的 Storm 的主要版本相符。The version of the package must also match the major version of Storm installed on HDInsight.

HDInsight 版本HDInsight version Apache Storm 版本Apache Storm version SCP.NET 版本SCP.NET version 默认 Mono 版本Default Mono version
3.33.3 0.10.x0.10.x 0.10.x.x0.10.x.x
(仅在基于 Windows 的 HDInsight 上)(only on Windows-based HDInsight)
不可用NA
3.43.4 0.10.0.x0.10.0.x 0.10.0.x0.10.0.x 3.2.83.2.8
3.53.5 1.0.2.x1.0.2.x 1.0.0.x1.0.0.x 4.2.14.2.1
3.63.6 1.1.0.x1.1.0.x 1.0.0.x1.0.0.x 4.2.84.2.8

Important

基于 Linux 的群集上的 C# 拓扑必须使用 .NET 4.5,并使用 Mono 在 HDInsight 群集上运行。C# topologies on Linux-based clusters must use .NET 4.5, and use Mono to run on the HDInsight cluster. 请查看 Mono 兼容性,了解可能的不兼容性。Check Mono compatibility for potential incompatibilities.

安装 Visual StudioInstall Visual Studio

可以使用下列其中一个版本的 Visual Studio,通过 SCP.NET 来开发 C# 拓扑:You can develop C# topologies with SCP.NET by using one of the following versions of Visual Studio:

安装针对 Visual Studio 的 Data Lake 工具Install Data Lake tools for Visual Studio

若要安装针对 Visual Studio 的 Data Lake 工具,请执行针对 Visual Studio 的 Data Lake 工具使用入门中的步骤。To install Data Lake tools for Visual Studio, follow the steps in Get started using Data Lake tools for Visual Studio.

安装 JavaInstall Java

从 Visual Studio 提交 Storm 拓扑时,SCP.NET 会生成一个包含拓扑和依赖项的 zip 文件。When you submit a Storm topology from Visual Studio, SCP.NET generates a zip file that contains the topology and dependencies. 使用 Java 来创建这些 zip 文件是因为其采用的格式更兼容基于 Linux 的群集。Java is used to create these zip files, because it uses a format that is more compatible with Linux-based clusters.

  1. 在开发环境中安装 Java 开发人员工具包 (JDK) 7 或更高版本。Install the Java Developer Kit (JDK) 7 or later on your development environment. 可以从 Oracle 获取 Oracle JDK。You can get the Oracle JDK from Oracle. 也可使用其他 Java 发行版You can also use other Java distributions.

  2. JAVA_HOME 环境变量必须指向包含 Java 的目录。The JAVA_HOME environment variable must point to the directory that contains Java.

  3. PATH 环境变量必须包含 %JAVA_HOME%\bin 目录。The PATH environment variable must include the %JAVA_HOME%\bin directory.

可以使用以下 C# 控制台应用程序来验证 Java 和 JDK 是否已正确安装:You can use the following C# console application to verify that Java and the JDK are correctly installed:

using System;
using System.IO;
namespace ConsoleApplication2
{
   class Program
   {
       static void Main(string[] args)
       {
           string javaHome = Environment.GetEnvironmentVariable("JAVA_HOME");
           if (!string.IsNullOrEmpty(javaHome))
           {
               string jarExe = Path.Combine(javaHome + @"\bin", "jar.exe");
               if (File.Exists(jarExe))
               {
                   Console.WriteLine("JAVA Is Installed properly");
                    return;
               }
               else
               {
                   Console.WriteLine("A valid JAVA JDK is not found. Looks like JRE is installed instead of JDK.");
               }
           }
           else
           {
             Console.WriteLine("A valid JAVA JDK is not found. JAVA_HOME environment variable is not set.");
           }
       }  
   }
}

Apache Storm 模板Apache Storm templates

针对 Visual Studio 的 Data Lake 工具提供以下模板:The Data Lake tools for Visual Studio provide the following templates:

项目类型Project type 演示Demonstrates
Storm 应用程序Storm Application 空的 Storm 拓扑项目。An empty Storm topology project.
Storm Azure SQL 写入器示例Storm Azure SQL Writer Sample 如何写入 Azure SQL 数据库。How to write to Azure SQL Database.
Storm Azure Cosmos DB 读取器示例Storm Azure Cosmos DB Reader Sample 如何从 Azure Cosmos DB 读取。How to read from Azure Cosmos DB.
Storm Azure Cosmos DB 写入器示例Storm Azure Cosmos DB Writer Sample 如何写入 Azure Cosmos DB。How to write to Azure Cosmos DB.
Storm EventHub 读取器示例Storm EventHub Reader Sample 如何从 Azure 事件中心读取。How to read from Azure Event Hubs.
Storm EventHub 写入器示例Storm EventHub Writer Sample 如何写入 Azure 事件中心。How to write to Azure Event Hubs.
Storm HBase 读取器示例Storm HBase Reader Sample 如何从 HBase on HDInsight 群集读取。How to read from HBase on HDInsight clusters.
Storm HBase 写入器示例Storm HBase Writer Sample 如何写入 HBase on HDInsight 群集。How to write to HBase on HDInsight clusters.
Storm 混合示例Storm Hybrid Sample 如何使用 Java 组件。How to use a Java component.
Storm 示例Storm Sample 基本的字数统计拓扑。A basic word count topology.

Warning

并非所有模板都可用于基于 Linux 的 HDInsight。Not all templates work with Linux-based HDInsight. 模板使用的 NuGet 程序包可能与 Mono 不兼容。NuGet packages used by the templates may not be compatible with Mono. 查看 Mono 兼容性文档并使用 .NET Portability Analyzer 确定潜在问题。Check the Mono compatibility document and use the .NET Portability Analyzer to identify potential problems.

在本文档的步骤中,将使用基本 Storm 应用程序项目类型来创建拓扑。In the steps in this document, you use the basic Storm Application project type to create a topology.

Apache HBase 模板说明Apache HBase templates notes

HBase 读取器和写入器模板使用 HBase REST API(而不是 HBase Java API)与 HBase on HDInsight 群集通信。The HBase reader and writer templates use the HBase REST API, not the HBase Java API, to communicate with an HBase on HDInsight cluster.

EventHub 模板说明EventHub templates notes

Important

EventHub 读取器模板随附的基于 Java 的 EventHub Spout 组件不适用于 Storm on HDInsight 3.5 或更高版本。The Java-based EventHub spout component included with the EventHub Reader template may not work with Storm on HDInsight version 3.5 or later. GitHub 上提供此组件的更新版本。An updated version of this component is available at GitHub.

如需使用此组件且适用于 Storm on HDInsight 3.5 的示例拓扑,请参阅 GitHubFor an example topology that uses this component and works with Storm on HDInsight 3.5, see GitHub.

创建 C# 拓扑Create a C# topology

  1. 打开 Visual Studio,选择“文件” > “新建”,然后选择“项目”。Open Visual Studio, select File > New, and then select Project.

  2. 在“新建项目”窗口中,展开“已安装” > “模板”,然后选择“Azure Data Lake”。From the New Project window, expand Installed > Templates, and select Azure Data Lake. 从模板列表中,选择“Storm 应用程序”。From the list of templates, select Storm Application. 在屏幕底部,输入 WordCount 作为应用程序名称。At the bottom of the screen, enter WordCount as the name of the application.

    “新建项目”窗口的屏幕截图

  3. 创建项目后,应有以下文件:After you have created the project, you should have the following files:

    • Program.cs:此文件定义项目的拓扑。Program.cs: This file defines the topology for your project. 默认情况下会创建包含一个 Spout 和一个 Bolt 的默认拓扑。A default topology that consists of one spout and one bolt is created by default.

    • Spout.cs:发出随机数的示例 Spout。Spout.cs: An example spout that emits random numbers.

    • Bolt.cs:保留 Spout 所发出数字计数的示例 Bolt。Bolt.cs: An example bolt that keeps a count of numbers emitted by the spout.

      创建项目时,NuGet 会下载最新的 SCP.NET 包When you create the project, NuGet downloads the latest SCP.NET package.

      Important

      用于项目的 SCP.Net 版本必须与 HDInsight 群集上的 Storm 版本相匹配。The SCP.Net version used in your project must match the Storm version present on your HDInsight cluster. 使用下表确定应使用的版本:Use the following table to determine which version you should use:

      HDInsight 版本HDInsight version Apache Storm 版本Apache Storm version SCP.NET 版本SCP.NET version 默认 Mono 版本Default Mono version
      3.33.3 0.10.0.x0.10.0.x 0.10.0.x(仅基于 Windows 的群集)0.10.0.x (Windows-based clusters only) 不可用NA
      3.43.4 0.10.0.x0.10.0.x 0.10.0.x0.10.0.x 3.2.83.2.8
      3.53.5 1.0.2.x1.0.2.x 1.0.0.x1.0.0.x 4.2.14.2.1
      3.63.6 1.1.0.#1.1.0.# 1.0.0.x1.0.0.x 4.2.84.2.8

      有关 HDInsight 随附的组件的详细信息,请参阅 HDInsight 组件版本For more information on components provided with HDInsight, see HDInsight component versions.

实现 SpoutImplement the spout

  1. 打开 Spout.csOpen Spout.cs. Spout 用于将外部源中的数据读入拓扑。Spouts are used to read data in a topology from an external source. Spout 的主要组件如下:The main components for a spout are:

    • NextTuple:允许 Spout 发出新的 Tuple 时由 Storm 调用。NextTuple: Called by Storm when the spout is allowed to emit new tuples.

    • Ack(仅限事务拓扑):针对从此 Spout 发送的元组,处理拓扑中其他组件发起的确认。Ack (transactional topology only): Handles acknowledgements initiated by other components in the topology for tuples sent from the spout. 确认元组可让 Spout 知了解下游组件已成功处理元组。Acknowledging a tuple lets the spout know that it was processed successfully by downstream components.

    • 失败(仅限事务拓扑):处理那些无法处理拓扑中其他组件的元组。Fail (transactional topology only): Handles tuples that are fail-processing other components in the topology. 实现 Fail 方法可以重新发出元组,以便对其再次处理。Implementing a Fail method allows you to re-emit the tuple so that it can be processed again.

  2. Spout 类的内容替换为以下文本:此 Spout 会随机将语句发出到拓扑中。Replace the contents of the Spout class with the following text: This spout randomly emits a sentence into the topology.

    private Context ctx;
    private Random r = new Random();
    string[] sentences = new string[] {
        "the cow jumped over the moon",
        "an apple a day keeps the doctor away",
        "four score and seven years ago",
        "snow white and the seven dwarfs",
        "i am at two with nature"
    };
    
    public Spout(Context ctx)
    {
        // Set the instance context
        this.ctx = ctx;
    
        Context.Logger.Info("Generator constructor called");
    
        // Declare Output schema
        Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
        // The schema for the default output stream is
        // a tuple that contains a string field
        outputSchema.Add("default", new List<Type>() { typeof(string) });
        this.ctx.DeclareComponentSchema(new ComponentStreamSchema(null, outputSchema));
    }
    
    // Get an instance of the spout
    public static Spout Get(Context ctx, Dictionary<string, Object> parms)
    {
        return new Spout(ctx);
    }
    
    public void NextTuple(Dictionary<string, Object> parms)
    {
        Context.Logger.Info("NextTuple enter");
        // The sentence to be emitted
        string sentence;
    
        // Get a random sentence
        sentence = sentences[r.Next(0, sentences.Length - 1)];
        Context.Logger.Info("Emit: {0}", sentence);
        // Emit it
        this.ctx.Emit(new Values(sentence));
    
        Context.Logger.Info("NextTuple exit");
    }
    
    public void Ack(long seqId, Dictionary<string, Object> parms)
    {
        // Only used for transactional topologies
    }
    
    public void Fail(long seqId, Dictionary<string, Object> parms)
    {
        // Only used for transactional topologies
    }
    

实现 BoltImplement the bolts

  1. 删除项目中的现有 Bolt.cs 文件。Delete the existing Bolt.cs file from the project.

  2. 在“解决方案资源管理器”中,右键单击该项目,然后选择“添加” > “新建项”。In Solution Explorer, right-click the project, and select Add > New item. 从列表中选择“Storm Bolt”,然后输入 Splitter.cs 作为名称。From the list, select Storm Bolt, and enter Splitter.cs as the name. Counter.cs的另一个 Bolt。Repeat this process to create a second bolt named Counter.cs.

    • Splitter.cs:实现 Bolt,以将句子分割成不同的单词并发出一串新单词。Splitter.cs: Implements a bolt that splits sentences into individual words, and emits a new stream of words.

    • Counter.cs:实现 Bolt,对每个单词计数,并发出一串新单词和每个单词的计数。Counter.cs: Implements a bolt that counts each word, and emits a new stream of words and the count for each word.

      Note

      这些 Bolt 读取和写入流,但是你也可以使用 Bolt 来与数据库或服务等源进行通信。These bolts read and write to streams, but you can also use a bolt to communicate with sources such as a database or service.

  3. 打开 Splitter.csOpen Splitter.cs. 默认情况下它只包含一个方法:ExecuteIt has only one method by default: Execute. 在 Bolt 收到要处理的元组时将调用 Execute 方法。The Execute method is called when the bolt receives a tuple for processing. 此时,可读取和处理传入元组,以及发出传出元组。Here, you can read and process incoming tuples, and emit outbound tuples.

  4. Splitter 类的内容替换为以下代码:Replace the contents of the Splitter class with the following code:

    private Context ctx;
    
    // Constructor
    public Splitter(Context ctx)
    {
        Context.Logger.Info("Splitter constructor called");
        this.ctx = ctx;
    
        // Declare Input and Output schemas
        Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
        // Input contains a tuple with a string field (the sentence)
        inputSchema.Add("default", new List<Type>() { typeof(string) });
        Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
        // Outbound contains a tuple with a string field (the word)
        outputSchema.Add("default", new List<Type>() { typeof(string) });
        this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema));
    }
    
    // Get a new instance of the bolt
    public static Splitter Get(Context ctx, Dictionary<string, Object> parms)
    {
        return new Splitter(ctx);
    }
    
    // Called when a new tuple is available
    public void Execute(SCPTuple tuple)
    {
        Context.Logger.Info("Execute enter");
    
        // Get the sentence from the tuple
        string sentence = tuple.GetString(0);
        // Split at space characters
        foreach (string word in sentence.Split(' '))
        {
            Context.Logger.Info("Emit: {0}", word);
            //Emit each word
            this.ctx.Emit(new Values(word));
        }
    
        Context.Logger.Info("Execute exit");
    }
    
  5. 打开 Counter.cs 并将类内容替换为以下代码:Open Counter.cs, and replace the class contents with the following code:

    private Context ctx;
    
    // Dictionary for holding words and counts
    private Dictionary<string, int> counts = new Dictionary<string, int>();
    
    // Constructor
    public Counter(Context ctx)
    {
        Context.Logger.Info("Counter constructor called");
        // Set instance context
        this.ctx = ctx;
    
        // Declare Input and Output schemas
        Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
        // A tuple containing a string field - the word
        inputSchema.Add("default", new List<Type>() { typeof(string) });
    
        Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
        // A tuple containing a string and integer field - the word and the word count
        outputSchema.Add("default", new List<Type>() { typeof(string), typeof(int) });
        this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema));
    }
    
    // Get a new instance
    public static Counter Get(Context ctx, Dictionary<string, Object> parms)
    {
        return new Counter(ctx);
    }
    
    // Called when a new tuple is available
    public void Execute(SCPTuple tuple)
    {
        Context.Logger.Info("Execute enter");
    
        // Get the word from the tuple
        string word = tuple.GetString(0);
        // Do we already have an entry for the word in the dictionary?
        // If no, create one with a count of 0
        int count = counts.ContainsKey(word) ? counts[word] : 0;
        // Increment the count
        count++;
        // Update the count in the dictionary
        counts[word] = count;
    
        Context.Logger.Info("Emit: {0}, count: {1}", word, count);
        // Emit the word and count information
        this.ctx.Emit(Constants.DEFAULT_STREAM_ID, new List<SCPTuple> { tuple }, new Values(word, count));
        Context.Logger.Info("Execute exit");
    }
    

定义拓扑Define the topology

Spout 和 Bolt 以图形方式排列,用于定义数据在组件之间的流动方式。Spouts and bolts are arranged in a graph, which defines how the data flows between components. 此拓扑的图形如下:For this topology, the graph is as follows:

组件排列方式图

句子从 Spout 发出,然后分布到 Splitter Bolt 的实例。Sentences are emitted from the spout, and are distributed to instances of the Splitter bolt. Splitter Bolt 将句子分割成多个单词,并将这些单词分布到 Counter Bolt。The Splitter bolt breaks the sentences into words, which are distributed to the Counter bolt.

因为字数会本地保留在 Counter 实例中,所以想要确保特定单词流向相同的 Counter Bolt 实例。Because word count is held locally in the Counter instance, you want to make sure that specific words flow to the same Counter bolt instance. 每个实例都会跟踪特定的单词。Each instance keeps track of specific words. 由于 Splitter Bolt 不保留任何状态,因此哪个 Splitter 实例接收哪个语句无关紧要。Since the Splitter bolt maintains no state, it really doesn't matter which instance of the splitter receives which sentence.

打开 Program.csOpen Program.cs. 重要的方法是 GetTopologyBuilder,用于定义提交到 Storm 的拓扑。The important method is GetTopologyBuilder, which is used to define the topology that is submitted to Storm. GetTopologyBuilder 的内容替换为以下代码,以实现上面所述的拓扑:Replace the contents of GetTopologyBuilder with the following code to implement the topology described previously:

// Create a new topology named 'WordCount'
TopologyBuilder topologyBuilder = new TopologyBuilder("WordCount" + DateTime.Now.ToString("yyyyMMddHHmmss"));

// Add the spout to the topology.
// Name the component 'sentences'
// Name the field that is emitted as 'sentence'
topologyBuilder.SetSpout(
    "sentences",
    Spout.Get,
    new Dictionary<string, List<string>>()
    {
        {Constants.DEFAULT_STREAM_ID, new List<string>(){"sentence"}}
    },
    1);
// Add the splitter bolt to the topology.
// Name the component 'splitter'
// Name the field that is emitted 'word'
// Use suffleGrouping to distribute incoming tuples
//   from the 'sentences' spout across instances
//   of the splitter
topologyBuilder.SetBolt(
    "splitter",
    Splitter.Get,
    new Dictionary<string, List<string>>()
    {
        {Constants.DEFAULT_STREAM_ID, new List<string>(){"word"}}
    },
    1).shuffleGrouping("sentences");

// Add the counter bolt to the topology.
// Name the component 'counter'
// Name the fields that are emitted 'word' and 'count'
// Use fieldsGrouping to ensure that tuples are routed
//   to counter instances based on the contents of field
//   position 0 (the word). This could also have been
//   List<string>(){"word"}.
//   This ensures that the word 'jumped', for example, will always
//   go to the same instance
topologyBuilder.SetBolt(
    "counter",
    Counter.Get,
    new Dictionary<string, List<string>>()
    {
        {Constants.DEFAULT_STREAM_ID, new List<string>(){"word", "count"}}
    },
    1).fieldsGrouping("splitter", new List<int>() { 0 });

// Add topology config
topologyBuilder.SetTopologyConfig(new Dictionary<string, string>()
{
    {"topology.kryo.register","[\"[B\"]"}
});

return topologyBuilder;

提交拓扑Submit the topology

  1. 在“解决方案资源管理器”中,右键单击项目,并选择“提交到 Storm on HDInsight”。In Solution Explorer, right-click the project, and select Submit to Storm on HDInsight.

    Note

    如果出现提示,请输入 Azure 订阅的凭据。If prompted, enter the credentials for your Azure subscription. 如果有多个订阅,请登录到包含 Storm on HDInsight 群集的订阅。If you have more than one subscription, sign in to the one that contains your Storm on HDInsight cluster.

  2. 从“Storm 群集”下拉列表中选择 Storm on HDInsight 群集,并选择“提交”。Select your Storm on HDInsight cluster from the Storm Cluster drop-down list, and then select Submit. 可使用“输出”窗口监视提交是否成功。You can monitor if the submission is successful by using the Output window.

  3. 成功提交拓扑之后,应该会出现群集的“Storm 拓扑”。When the topology has been successfully submitted, the Storm Topologies for the cluster should appear. 从列表中选择“WordCount”拓扑,查看正在运行的拓扑的信息。Select the WordCount topology from the list to view information about the running topology.

    Note

    也可在“服务器资源管理器”中查看“Storm 拓扑”。You can also view Storm Topologies from Server Explorer. 展开“Azure” > “HDInsight”,右键单击 Storm on HDInsight 群集,然后选择“查看 Storm 拓扑”。Expand Azure > HDInsight, right-click a Storm on HDInsight cluster, and then select View Storm Topologies.

    若要查看拓扑中组件的信息,请双击图中的组件。To view information about the components in the topology, double-click the component in the diagram.

  4. 在“拓扑摘要”视图中,单击“终止”以停止拓扑。From the Topology Summary view, click Kill to stop the topology.

    Note

    Storm 拓扑会一直运行,直到它被停用,或者群集被删除。Storm topologies continue to run until they are deactivated, or the cluster is deleted.

事务拓扑Transactional topology

前面的拓扑是非事务性的拓扑中的组件不实现重播消息的功能。The previous topology is non-transactional. 拓扑中的组件不会实现重播消息的功能。The components in the topology do not implement functionality to replaying messages. 如需事务拓扑的示例,请创建一个项目,然后选择“Storm 示例”作为项目类型。For an example of a transactional topology, create a project and select Storm Sample as the project type.

事务拓扑会实现以下项来支持重播数据:Transactional topologies implement the following to support replay of data:

  • 元数据缓存:Spout 必须存储所发出数据的元数据,这样,在失败时,就可以再次检索和发出数据。Metadata caching: The spout must store metadata about the data emitted, so that the data can be retrieved and emitted again if a failure occurs. 此示例所发出的数据较少,因此将每个元组的原始数据存储在字典中以便重播。Because the data emitted by the sample is small, the raw data for each tuple is stored in a dictionary for replay.

  • Ack:拓扑中的每个 Bolt 都可以调用 this.ctx.Ack(tuple) 来确认它已成功处理元组。Ack: Each bolt in the topology can call this.ctx.Ack(tuple) to acknowledge that it has successfully processed a tuple. 所有 Bolt 都已确认 Tuple 之后,即会调用 Spout 的 Ack 方法。When all bolts have acknowledged the tuple, the Ack method of the spout is invoked. Ack 方法允许 Spout 删除为重放而缓存的数据。The Ack method allows the spout to remove data that was cached for replay.

  • 失败:每个 Bolt 都可以调用 this.ctx.Fail(tuple),指出元组的处理失败。Fail: Each bolt can call this.ctx.Fail(tuple) to indicate that processing has failed for a tuple. 这项失败会传播到 Spout 的 Fail 方法,在其中,可以使用缓存的元数据来重放 Tuple。The failure propagates to the Fail method of the spout, where the tuple can be replayed by using cached metadata.

  • 序列 ID:发出元组时,可以指定唯一的序列 ID。Sequence ID: When emitting a tuple, a unique sequence ID can be specified. 此值标识要进行重播(确认和失败)处理的元组。This value identifies the tuple for replay (Ack and Fail) processing. 例如,发出数据时, Storm 示例 项目中的 Spout 会使用以下项:For example, the spout in the Storm Sample project uses the following when emitting data:

      this.ctx.Emit(Constants.DEFAULT_STREAM_ID, new Values(sentence), lastSeqId);
    

    此代码会发出包含默认数据流的句子的元组,以及 lastSeqId 中所含的序列 ID 值。This code emits a tuple that contains a sentence to the default stream, with the sequence ID value contained in lastSeqId. 在此示例中,会递增每个发出的元组的 lastSeqIdFor this example, lastSeqId is incremented for every tuple emitted.

如“Storm 示例”项目所示,在运行时,可以根据配置来设置组件是否为事务性。As demonstrated in the Storm Sample project, whether a component is transactional can be set at runtime, based on configuration.

使用 C# 和 Java 的混合拓扑Hybrid topology with C# and Java

还可使用针对 Visual Studio 的 Data Lake 工具来创建混合拓扑,其中的某些组件采用 C# 编写,另一些组件采用 Java 编写。You can also use Data Lake tools for Visual Studio to create hybrid topologies, where some components are C# and others are Java.

如需混合拓扑的示例,请创建一个项目,然后选择“Storm 混合示例”。For an example of a hybrid topology, create a project and select Storm Hybrid Sample. 此示例类型演示以下概念:This sample type demonstrates the following concepts:

  • Java Spout 和 C# Bolt:在 HybridTopology_javaSpout_csharpBolt 中定义。Java spout and C# bolt: Defined in HybridTopology_javaSpout_csharpBolt.

    • 事务版本在 HybridTopologyTx_javaSpout_csharpBolt 中定义。A transactional version is defined in HybridTopologyTx_javaSpout_csharpBolt.
  • C# Spout 和 Java Bolt:在 HybridTopology_csharpSpout_javaBolt 中定义。C# spout and Java bolt: Defined in HybridTopology_csharpSpout_javaBolt.

    • 事务版本在 HybridTopologyTx_csharpSpout_javaBolt 中定义。A transactional version is defined in HybridTopologyTx_csharpSpout_javaBolt.

    Note

    此版本还演示了如何使用文本文件中的 clojure 代码作为 Java 组件。This version also demonstrates how to use Clojure code from a text file as a Java component.

若要切换在提交项目时使用的拓扑,请将 [Active(true)] 语句移到要在提交给群集之前使用的拓扑。To switch the topology that is used when the project is submitted, move the [Active(true)] statement to the topology you want to use, before submitting it to the cluster.

Note

JavaDependency 文件夹中,所需的所有 Java 文件都会提供为此项目的一部分。All the Java files that are required are provided as part of this project in the JavaDependency folder.

创建和提交混合拓扑时,需注意以下事项:Consider the following when you are creating and submitting a hybrid topology:

  • 使用 JavaComponentConstructor 创建 Spout 或 Bolt 的 Java 类实例。Use JavaComponentConstructor to create an instance of the Java class for a spout or bolt.

  • 使用 microsoft.scp.storm.multilang.CustomizedInteropJSONSerializer 将传入或传出 Java 组件的数据从 Java 对象序列化为 JSON。Use microsoft.scp.storm.multilang.CustomizedInteropJSONSerializer to serialize data into or out of Java components from Java objects to JSON.

  • 将拓扑提交到服务器时,必须使用“其他配置”选项指定 Java 文件路径。When submitting the topology to the server, you must use the Additional configurations option to specify the Java File paths. 指定的路径应该是包含 JAR 文件的目录,而 JAR 文件包含 Java 类。The path specified should be the directory that contains the JAR files that contain your Java classes.

Azure 事件中心Azure Event Hubs

SCP.NET 版本 0.9.4.203 引入了专用于事件中心 Spout(从事件中心读取数据的 Java Spout)的新类和方法。SCP.NET version 0.9.4.203 introduces a new class and method specifically for working with the Event Hub spout (a Java spout that reads from Event Hubs). 创建采用事件中心 Spout 的拓扑时,请使用以下方法:When you create a topology that uses an Event Hub spout, use the following methods:

  • EventHubSpoutConfig 类:创建一个对象,其中包含 Spout 组件的配置。EventHubSpoutConfig class: Creates an object that contains the configuration for the spout component.

  • TopologyBuilder.SetEventHubSpout 方法:将事件中心 Spout 组件添加到拓扑。TopologyBuilder.SetEventHubSpout method: Adds the Event Hub spout component to the topology.

Note

仍然必须使用 CustomizedInteropJSONSerializer 来序列化 Spout 所生成的数据。You must still use the CustomizedInteropJSONSerializer to serialize data produced by the spout.

使用 ConfigurationManagerUse ConfigurationManager

请勿使用 ConfigurationManager 从 Bolt 和 Spout 组件检索配置值。Don't use ConfigurationManager to retrieve configuration values from bolt and spout components. 这样做可能导致空指针异常。Doing so can cause a null pointer exception. 而项目的配置将作为拓扑上下文中的键值对传递到 Storm 拓扑中。Instead, the configuration for your project is passed into the Storm topology as a key and value pair in the topology context. 每个依赖于配置值的组件都必须在初始化过程中从上下文检索这些值。Each component that relies on configuration values must retrieve them from the context during initialization.

下面的代码演示如何检索这些值:The following code demonstrates how to retrieve these values:

public class MyComponent : ISCPBolt
{
    // To hold configuration information loaded from context
    Configuration configuration;
    ...
    public MyComponent(Context ctx, Dictionary<string, Object> parms)
    {
        // Save a copy of the context for this component instance
        this.ctx = ctx;
        // If it exists, load the configuration for the component
        if(parms.ContainsKey(Constants.USER_CONFIG))
        {
            this.configuration = parms[Constants.USER_CONFIG] as System.Configuration.Configuration;
        }
        // Retrieve the value of "Foo" from configuration
        var foo = this.configuration.AppSettings.Settings["Foo"].Value;
    }
    ...
}

如果使用 Get 方法返回组件的实例,则必须确保它将 ContextDictionary<string, Object> 参数都传递给构造函数。If you use a Get method to return an instance of your component, you must ensure that it passes both the Context and Dictionary<string, Object> parameters to the constructor. 以下示例是一个基本的 Get 方法,用于正确传递这些值:The following example is a basic Get method that properly passes these values:

public static MyComponent Get(Context ctx, Dictionary<string, Object> parms)
{
    return new MyComponent(ctx, parms);
}

如何更新 SCP.NETHow to update SCP.NET

最新版 SCP.NET 支持通过 NuGet 进行包升级。Recent releases of SCP.NET support package upgrade through NuGet. 有新的更新可用时,会收到升级通知。When a new update is available, you receive an upgrade notification. 若要手动检查升级,请执行以下步骤:To manually check for an upgrade, follow these steps:

  1. 在“解决方案资源管理器”中,右键单击项目,然后选择“管理 NuGet 包”。In Solution Explorer, right-click the project, and select Manage NuGet Packages.

  2. 从包管理器中选择“更新”。From the package manager, select Updates. 有可用更新时会将其列出。If an update is available, it is listed. 单击“更新”可让包安装更新。Click Update for the package to install it.

Important

如果项目是通过未使用 NuGet 的旧版 SCP.NET 创建的,则必须执行以下步骤以更新到更新版本:If your project was created with an earlier version of SCP.NET that did not use NuGet, you must perform the following steps to update to a newer version:

  1. 在“解决方案资源管理器”中,右键单击项目,然后选择“管理 NuGet 包”。In Solution Explorer, right-click the project, and select Manage NuGet Packages.
  2. 使用“搜索”字段搜索 Microsoft.SCP.Net.SDK,然后将其添加到项目中。Using the Search field, search for, and then add, Microsoft.SCP.Net.SDK to the project.

排查拓扑常见问题Troubleshoot common issues with topologies

空指针异常Null pointer exceptions

在基于 Linux 的 HDInsight 集群中使用 C# 拓扑时,使用 ConfigurationManager 在运行时读取配置设置的 bolt 和 spout 组件可能会返回空指针异常。When you are using a C# topology with a Linux-based HDInsight cluster, bolt and spout components that use ConfigurationManager to read configuration settings at runtime may return null pointer exceptions.

项目的配置将作为拓扑上下文中的键值对传递到 Storm 拓扑中。The configuration for your project is passed into the Storm topology as a key and value pair in the topology context. 该配置可以从字典对象进行检索,字典对象是在初始化组件时传递到组件的。It can be retrieved from the dictionary object that is passed to your components when they are initialized.

有关详细信息,请参阅本文档的 ConfigurationManager 部分。For more information, see the ConfigurationManager section of this document.

System.TypeLoadExceptionSystem.TypeLoadException

在基于 Linux 的 HDInsight 群集中使用 C# 拓扑时,可能会遇到以下错误:When you are using a C# topology with a Linux-based HDInsight cluster, you may encounter the following error:

System.TypeLoadException: Failure has occurred while loading a type.

如果使用二进制文件,而该文件不兼容 Mono 支持的 .NET 版本,通常会发生此错误。This error occurs when you use a binary that is not compatible with the version of .NET that Mono supports.

对于基于 Linux 的 HDInsight 群集,请确保项目使用的二进制文件是针对 .NET 4.5 编译的。For Linux-based HDInsight clusters, make sure that your project uses binaries compiled for .NET 4.5.

在本地测试拓扑Test a topology locally

虽然很容易就可以将拓扑部署到群集,但是,在某些情况下,可能需要在本地测试拓扑。Although it is easy to deploy a topology to a cluster, in some cases, you may need to test a topology locally. 使用以下步骤,在开发环境上本地运行和测试本文中的示例拓扑。Use the following steps to run and test the example topology in this article locally in your development environment.

Warning

本地测试只适用于仅限 C# 的基本拓扑。Local testing only works for basic, C#-only topologies. 不能将本地测试用于混合拓扑或用于使用多个流的拓扑。You cannot use local testing for hybrid topologies or topologies that use multiple streams.

  1. 在“解决方案资源管理器”中,右键单击项目,然后选择“属性”。In Solution Explorer, right-click the project, and select Properties. 在项目属性中,将“输出类型”更改为“控制台应用程序”。In the project properties, change the Output type to Console Application.

    项目属性的屏幕截图,已突出显示输出类型

    Note

    将拓扑部署到群集之前,请记得将“输出类型”改回“类库”。Remember to change the Output type back to Class Library before you deploy the topology to a cluster.

  2. 在“解决方案资源管理器”中,右键单击项目,然后选择“添加” > “新建项”。In Solution Explorer, right-click the project, and then select Add > New Item. 选择“类”,并输入 LocalTest.cs 作为类名称。Select Class, and enter LocalTest.cs as the class name. 最后,单击“添加”。Finally, click Add.

  3. 打开 LocalTest.cs,在顶部添加以下 using 语句:Open LocalTest.cs, and add the following using statement at the top:

    using Microsoft.SCP;
    
  4. 使用以下代码作为 LocalTest 类的内容:Use the following code as the contents of the LocalTest class:

    // Drives the topology components
    public void RunTestCase()
    {
        // An empty dictionary for use when creating components
        Dictionary<string, Object> emptyDictionary = new Dictionary<string, object>();
    
        #region Test the spout
        {
            Console.WriteLine("Starting spout");
            // LocalContext is a local-mode context that can be used to initialize
            // components in the development environment.
            LocalContext spoutCtx = LocalContext.Get();
            // Get a new instance of the spout, using the local context
            Spout sentences = Spout.Get(spoutCtx, emptyDictionary);
    
            // Emit 10 tuples
            for (int i = 0; i < 10; i++)
            {
                sentences.NextTuple(emptyDictionary);
            }
            // Use LocalContext to persist the data stream to file
            spoutCtx.WriteMsgQueueToFile("sentences.txt");
            Console.WriteLine("Spout finished");
        }
        #endregion
    
        #region Test the splitter bolt
        {
            Console.WriteLine("Starting splitter bolt");
            // LocalContext is a local-mode context that can be used to initialize
            // components in the development environment.
            LocalContext splitterCtx = LocalContext.Get();
            // Get a new instance of the bolt
            Splitter splitter = Splitter.Get(splitterCtx, emptyDictionary);
    
            // Set the data stream to the data created by the spout
            splitterCtx.ReadFromFileToMsgQueue("sentences.txt");
            // Get a batch of tuples from the stream
            List<SCPTuple> batch = splitterCtx.RecvFromMsgQueue();
            // Process each tuple in the batch
            foreach (SCPTuple tuple in batch)
            {
                splitter.Execute(tuple);
            }
            // Use LocalContext to persist the data stream to file
            splitterCtx.WriteMsgQueueToFile("splitter.txt");
            Console.WriteLine("Splitter bolt finished");
        }
        #endregion
    
        #region Test the counter bolt
        {
            Console.WriteLine("Starting counter bolt");
            // LocalContext is a local-mode context that can be used to initialize
            // components in the development environment.
            LocalContext counterCtx = LocalContext.Get();
            // Get a new instance of the bolt
            Counter counter = Counter.Get(counterCtx, emptyDictionary);
    
            // Set the data stream to the data created by splitter bolt
            counterCtx.ReadFromFileToMsgQueue("splitter.txt");
            // Get a batch of tuples from the stream
            List<SCPTuple> batch = counterCtx.RecvFromMsgQueue();
            // Process each tuple in the batch
            foreach (SCPTuple tuple in batch)
            {
                counter.Execute(tuple);
            }
            // Use LocalContext to persist the data stream to file
            counterCtx.WriteMsgQueueToFile("counter.txt");
            Console.WriteLine("Counter bolt finished");
        }
        #endregion
    }
    

    花费片刻时间通读代码注释。Take a moment to read through the code comments. 此代码使用 LocalContext 在开发环境中运行组件,并将组件之间的数据流保存到本地磁盘驱动器上的文本文件中。This code uses LocalContext to run the components in the development environment, and it persists the data stream between components to text files on the local drive.

  5. 打开 Program.cs,将以下代码添加到 Main 方法中:Open Program.cs, and add the following to the Main method:

    Console.WriteLine("Starting tests");
    System.Environment.SetEnvironmentVariable("microsoft.scp.logPrefix", "WordCount-LocalTest");
    // Initialize the runtime
    SCPRuntime.Initialize();
    
    //If we are not running under the local context, throw an error
    if (Context.pluginType != SCPPluginType.SCP_NET_LOCAL)
    {
        throw new Exception(string.Format("unexpected pluginType: {0}", Context.pluginType));
    }
    // Create test instance
    LocalTest tests = new LocalTest();
    // Run tests
    tests.RunTestCase();
    Console.WriteLine("Tests finished");
    Console.ReadKey();
    
  6. 保存更改,然后单击“F5”,或选择“调试” > “开始调试”以启动项目。Save the changes, and then click F5 or select Debug > Start Debugging to start the project. 此时会出现一个控制台窗口,该窗口会在测试进行过程中记录状态。A console window should appear, and log status as the tests progress. 显示“测试已完成”后,按任意键关闭窗口。When Tests finished appears, press any key to close the window.

  7. 使用 Windows 资源管理器找到包含项目的目录。Use Windows Explorer to locate the directory that contains your project. 例如:C:\Users<your_user_name>\Documents\Visual Studio 2013\Projects\WordCount\WordCountFor example: C:\Users<your_user_name>\Documents\Visual Studio 2013\Projects\WordCount\WordCount. 在此目录中打开 Bin,然后单击“调试”。In this directory, open Bin, and then click Debug. 应可看到运行测试时生成的文本文件:sentences.txt、counter.txt 和 splitter.txt。You should see the text files that were produced when the tests ran: sentences.txt, counter.txt, and splitter.txt. 打开每个文本文件并检查数据。Open each text file and inspect the data.

    Note

    字符串数据在这些文件中持久保存为十进制值数组。String data persists as an array of decimal values in these files. 例如,splitter.txt 文件中的 [[97,103,111]] 是单词“and”。For example, [[97,103,111]] in the splitter.txt file is the word and.

Note

在部署到 Storm on HDInsight 群集之前,请确保将“项目类型”设置回“类库”。Be sure to set the Project type back to Class Library before deploying to a Storm on HDInsight cluster.

记录信息Log information

可以使用 Context.Logger轻松记录拓扑组件中的信息。You can easily log information from your topology components by using Context.Logger. 例如,以下命令会创建一个信息日志条目:For example, the following command creates an informational log entry:

Context.Logger.Info("Component started");

可从“服务器资源管理器”中的“Hadoop 服务日志”查看记录的信息。Logged information can be viewed from the Hadoop Service Log, which is found in Server Explorer. 展开 Storm on HDInsight 群集的条目,然后展开“Hadoop 服务日志”。Expand the entry for your Storm on HDInsight cluster, and then expand Hadoop Service Log. 最后,选择要查看的日志文件。Finally, select the log file to view.

Note

日志存储在群集使用的 Azure 存储帐户中。The logs are stored in the Azure storage account that is used by your cluster. 若要查看 Visual Studio 中的日志,必须登录到拥有存储帐户的 Azure 订阅。To view the logs in Visual Studio, you must sign in to the Azure subscription that owns the storage account.

查看错误信息View error information

若要查看运行中拓扑中所发生的错误,请使用以下步骤:To view errors that have occurred in a running topology, use the following steps:

  1. 在“服务器资源管理器”中,右键单击 Storm on HDInsight 群集,然后选择“查看 Storm 拓扑”。From Server Explorer, right-click the Storm on HDInsight cluster, and select View Storm topologies.

  2. 对于 Spout 和 Bolt,“上一错误”列包含有关上次错误的信息。For the Spout and Bolts, the Last Error column contains information on the last error.

  3. 选择有错误列出的组件的“Spout ID”或“Bolt ID”。Select the Spout Id or Bolt Id for the component that has an error listed. 在显示的详细信息页上,页面底部的“错误”部分中列出了其他错误信息。On the details page that is displayed, additional error information is listed in the Errors section at the bottom of the page.

  4. 若要获取详细信息,请从页面的“执行器”部分中选择“端口”,以查看最后几分钟的 Storm 工作进程日志。To obtain more information, select a Port from the Executors section of the page, to see the Storm worker log for the last few minutes.

提交拓扑时出错Errors submitting topologies

如果将拓扑提交到 HDInsight 时遇到错误,可在 HDInsight 群集上找到处理拓扑提交的服务器端组件的日志。If you encounter errors submitting a topology to HDInsight, you can find logs for the server-side components that handle topology submission on your HDInsight cluster. 若要检索这些日志,请从命令行运行以下命令:To retrieve these logs, use the following command from a command line:

scp sshuser@clustername-ssh.azurehdinsight.cn:/var/log/hdinsight-scpwebapi/hdinsight-scpwebapi.out .

sshuser 替换为群集的 SSH 用户帐户。Replace sshuser with the SSH user account for the cluster. clustername 替换为 HDInsight 群集的名称。Replace clustername with the name of the HDInsight cluster. 有关将 scpssh 与 HDInsight 配合使用的详细信息,请参阅将 SSH 与 HDInsight 配合使用For more information on using scp and ssh with HDInsight, see Use SSH with HDInsight.

提交失败可能有多个原因:Submissions can fail for multiple reasons:

  • JDK 未安装或不在路径中。JDK is not installed or is not in the path.
  • 提交中不包括必需的 Java 依赖项。Required Java dependencies are not included in the submission.
  • 依赖项不兼容。Incompatible dependencies.
  • 拓扑名称重复。Duplicate topology names.

如果 hdinsight-scpwebapi.out 日志包含 FileNotFoundException,则可能是由以下情况导致的:If the hdinsight-scpwebapi.out log contains a FileNotFoundException, this might be caused by the following conditions:

  • JDK 不在开发环境的路径中。The JDK is not in the path on the development environment. 请验证 JDK 是否安装在开发环境中,以及 %JAVA_HOME%/bin 是否在路径中。Verify that the JDK is installed in the development environment, and that %JAVA_HOME%/bin is in the path.
  • 缺少 Java 依赖项。You are missing a Java dependency. 请确保提交内容包括任何必需的 .jar 文件。Make sure you are including any required .jar files as part of the submission.

后续步骤Next steps

如需处理事件中心数据的示例,请参阅使用 Storm on HDInsight 处理 Azure 事件中心的事件For an example of processing data from Event Hubs, see Process events from Azure Event Hubs with Storm on HDInsight.

如需将流数据拆分为多个流的 C# 拓扑示例,请参阅 C# Storm 示例For an example of a C# topology that splits stream data into multiple streams, see C# Storm example.

若要详细了解如何创建 C# 拓扑,请查看 GitHubTo discover more information about creating C# topologies, see GitHub.

有关 HDInsight 的其他用法和其他 Storm on HDinsight 示例,请参阅以下文档:For more ways to work with HDInsight and more Storm on HDInsight samples, see the following documents:

Microsoft SCP.NETMicrosoft SCP.NET

Apache Storm on HDInsightApache Storm on HDInsight

Apache HDInsight 上的 HadoopApache Hadoop on HDInsight

Apache HBase on HDInsightApache HBase on HDInsight