Azure HDInsight 中 Apache Storm 的 SCP 编程指南SCP programming guide for Apache Storm in Azure HDInsight

SCP 是一个用于构建实时、可靠、一致且高性能的数据处理应用程序的平台。SCP is a platform for building real time, reliable, consistent, and high-performance data processing applications. 它在 Apache Storm(开源软件社区设计的一个流处理系统)的基础上构建而成。It's built on top of Apache Storm, which is a stream-processing system designed by open-source software communities. Storm 由 Nathan Marz 设计,Nathan Marz created Storm. 在 Twitter 上以开放源代码形式发布。It was published as open source by Twitter. Storm 使用 Apache ZooKeeper(另一个 Apache 项目)来实现高可靠性的分布式协调和状态管理。Storm uses Apache ZooKeeper, which is another Apache project that enables highly reliable distributed coordination and state management.

SCP 项目不仅已移植到 Windows 的 Storm 中,还为 Windows 环境增加了扩展和自定义。The SCP project has ported not only Storm on Windows but also project-added extensions and customization for the Windows environment. 扩展包括 .NET 开发人员体验和 .NET 库。The extensions include the .NET developer experience and .NET libraries. 自定义包括基于 Windows 的部署。The customization includes Windows-based deployment.

借助扩展和自定义,无需创建开源软件项目的分支。With the extensions and customization, you don't need to fork the open-source software projects. 可以使用在 Storm 顶层构建的派生环境。You can use derived environments that are built on top of Storm.

处理模型Processing model

SCP 中的数据以连续的元组流形式建模。The data in SCP is modeled as continuous streams of tuples. 通常,元组:Typically, the tuples:

  1. 流入队列。Flow into a queue.
  2. 由 Storm 拓扑中托管的业务逻辑提取和转换。Are picked up and transformed by business logic hosted inside a Storm topology.
  3. 其输出将作为元组传送到其他 SCP 系统,或者提交到存储(例如分布式文件系统)和数据库(例如 SQL Server)。Either have their output piped as tuples to another SCP system or are committed to stores like distributed file systems and databases like SQL Server.

馈送待处理数据的队列(继而馈送数据存储)示意图

在 Storm 中,应用程序拓扑定义计算图。In Storm, an application topology defines a computation graph. 拓扑中的每个节点包含处理逻辑。Each node in a topology contains processing logic. 节点之间的链接指示数据流。Links between nodes indicate data flow.

用于将输入数据注入到拓扑中的节点称为 SpoutNodes that inject input data into the topology are called spouts. 可以使用 Spout 来为数据定序。You can use them to sequence the data. 输入数据可能来自文件日志、事务数据库或系统性能计数器等源。The input data could come from a source like file logs, a transactional database, or a system performance counter.

具有输入和输出数据流的节点称为 BoltNodes that have both input and output data flows are called bolts. Bolt 执行实际数据筛选、选择和聚合。They do the actual data filtering, selections, and aggregation.

SCP 支持“尽力而为”、“至少一次”和“恰好一次”数据处理。SCP supports best-efforts, at-least-once, and exactly once data processing.

在分布式流处理应用程序中,数据处理过程中可能会发生错误。In a distributed stream-processing application, errors might happen during data processing. 此类错误包括网络中断、计算机故障或代码错误。Such errors include a network outage, a machine failure, or an error in your code. “至少一次”处理会在出现错误时自动重放原来的数据,从而确保所有数据均至少被处理一次。At-least-once processing ensures all data is processed at least once by automatically replaying the same data when an error happens.

“至少一次”处理简单且可靠,适用于许多应用程序。At-least-once processing is simple and reliable, and it suits many applications. 但是,当应用程序需要确切计数时,进行“至少一次”处理是不够的,因为相同的数据有可能用于应用程序拓扑中。But when an application requires exact counting, at-least-once processing is insufficient because the same data could be replayed in the application topology. 在此情况下,“恰好一次”处理可确保即使数据被多次重放和处理,结果也是正确的。In that case, exactly once processing makes sure the result is correct even when the data is replayed and processed multiple times.

通过 SCP,.NET 开发人员可以创建实时数据处理应用程序,同时在 Storm 中使用 Java 虚拟机 (JVM)。SCP lets .NET developers create real-time data processing applications while using a Java Virtual Machine (JVM) with Storm. JVM 和 .NET 通过 TCP 本地套接字通信。A JVM and .NET communicate via TCP local sockets. 每个 Spout/Bolt 都是一个 .NET/Java 进程对,其中,用户逻辑作为插件在 .NET 进程中运行。Each spout/bolt is a .NET/Java process pair, where the user logic runs in a .NET process as a plug-in.

若要在 SCP 的顶层构建数据处理应用程序,请执行以下步骤:To build a data processing application on top of SCP, follow these steps:

  1. 设计并实现 Spout,以从队列中提取数据。Design and implement spouts to pull in data from queues.
  2. 设计并实现 Bolt 用于处理输入数据,并将数据保存到数据库等外部存储中。Design and implement bolts that process the input data and save it to external stores like a database.
  3. 设计拓扑,然后提交并运行拓扑。Design the topology, then submit and run it.

拓扑定义顶点以及在它们之间流动的数据。The topology defines vertexes and the data that flows between them. SCP 采用拓扑规范并将拓扑部署在 Storm 群集上。在 Storm 群集中,每个顶点在一个逻辑节点上运行。SCP takes a topology specification and deploys it on a Storm cluster, where each vertex runs on one logical node. Storm 任务计划程序处理故障转移和缩放。The Storm task scheduler takes care of failover and scaling.

本文使用一些简单示例来介绍如何使用 SCP 构建数据处理应用程序。This article uses some simple examples to walk through how to build data processing applications with SCP.

SCP 插件接口SCP plug-in interface

SCP 插件是独立的应用程序。SCP plug-ins are standalone applications. 它们可以在开发期间在 Visual Studio 中运行,也可以在完成生产部署后插入到 Storm 管道中。They can run inside Visual Studio during development and be plugged into the Storm pipeline after production deployment.

编写 SCP 插件就像编写任何其他 Windows 控制台应用程序一样。Writing an SCP plug-in is the same as writing any other Windows console application. SCP.NET 平台声明 Spout/Bolt 的接口。The SCP.NET platform declares some interfaces for spout/bolt. 插件代码实现这些接口。Your plug-in code implements these interfaces. 这种设计的主要目的是使你可以专注于自己的业务逻辑,同时让 SCP.NET 平台处理其他事情。The main purpose of this design is to let you focus on your business logic while letting the SCP.NET platform handle other things.

插件代码实现以下接口之一。Your plug-in code implements one of the following interfaces. 实现哪个接口取决于拓扑是事务性还是非事务性,以及组件是 Spout 还是 Bolt。Which interface depends on whether the topology is transactional or nontransactional and whether the component is a spout or a bolt.

  • ISCPSpoutISCPSpout
  • ISCPBoltISCPBolt
  • ISCPTxSpoutISCPTxSpout
  • ISCPBatchBoltISCPBatchBolt

ISCPPluginISCPPlugin

ISCPPlugin 是许多插件的公用接口。目前,该接口是一个虚拟接口。ISCPPlugin is the common interface for many plug-ins. Currently, it's a dummy interface.

public interface ISCPPlugin
{
}

ISCPSpoutISCPSpout

ISCPSpout 是非事务性 Spout 的接口。ISCPSpout is the interface for a nontransactional spout.

public interface ISCPSpout : ISCPPlugin
{
    void NextTuple(Dictionary<string, Object> parms);
    void Ack(long seqId, Dictionary<string, Object> parms); 
    void Fail(long seqId, Dictionary<string, Object> parms);
}

调用 NextTuple 时,C# 代码可以发出一个或多个元组。When NextTuple is called, your C# code may emit one or more tuples. 如果没有要发出的数据,此方法应返回而不发出任何内容。If there's nothing to emit, this method should return without emitting anything.

NextTupleAckFail 方法都在 C# 进程的单一线程的紧凑循环中调用。The NextTuple, Ack, and Fail methods are all called in a tight loop in a single thread of a C# process. 如果没有要发出的元组,请短暂地将 NextTuple 置于休眠状态(例如 10 毫秒)。When there are no tuples to emit, have NextTuple sleep for a short amount of time like 10 milliseconds. 这种休眠有助于避免浪费 CPU 的使用。This sleep helps avoid wasting CPU availability.

仅当规范文件启用了确认机制时,才会调用 AckFail 方法。The Ack and Fail methods are called only when a specification file enables the acknowledgment mechanism. seqId 参数标识已确认或失败的元组。The seqId parameter identifies the tuple that is acknowledged or has failed. 如果在非事务性拓扑中启用了确认,则应在 Spout 中使用以下 Emit 函数:If acknowledgment is enabled in a nontransactional topology, the following Emit function should be used in a spout:

public abstract void Emit(string streamId, List<object> values, long seqId);

如果非事务性拓扑不支持确认,可将 AckFail 保留为空函数。If a nontransactional topology doesn't support acknowledgment, Ack and Fail can be left as empty functions.

这些函数中的 parms 输入参数指定空字典,保留供以后使用。The parms input parameter in these functions specifies an empty dictionary and is reserved for future use.

ISCPBoltISCPBolt

ISCPBolt 是非事务性 Bolt 的接口。ISCPBolt is the interface for a nontransactional bolt.

public interface ISCPBolt : ISCPPlugin
{
void Execute(SCPTuple tuple);
}

有新的元组可用时,将调用 Execute 函数来处理该元组。When a new tuple is available, the Execute function is called to process it.

ISCPTxSpoutISCPTxSpout

ISCPTxSpout 是事务性 Spout 的接口。ISCPTxSpout is the interface for a transactional spout.

public interface ISCPTxSpout : ISCPPlugin
{
    void NextTx(out long seqId, Dictionary<string, Object> parms);  
    void Ack(long seqId, Dictionary<string, Object> parms);         
    void Fail(long seqId, Dictionary<string, Object> parms);        
}

与对应的非事务性方法一样,NextTxAckFail 也都在 C# 进程的单一线程的紧凑循环中调用。Just like their nontransactional counterparts, NextTx, Ack, and Fail are all called in a tight loop in a single thread of a C# process. 如果没有要发出的元组,请短暂地将 NextTx 置于休眠状态(例如 10 毫秒)。When there are no tuples to emit, have NextTx sleep for a short amount of time like 10 milliseconds. 这种休眠有助于避免浪费 CPU 的使用。This sleep helps avoid wasting CPU availability.

调用 NextTx 启动新的事务时,seqId 输出参数将标识事务。When NextTx is called to start a new transaction, the seqId output parameter identifies the transaction. 该事务也在 AckFail 中使用。The transaction is also used in Ack and Fail. NextTx 方法可将数据发出到 Java 端。Your NextTx method can emit data to the Java side. 然后,数据会被存储在 ZooKeeper 中,以支持重用。The data is stored in ZooKeeper to support replay. 由于 ZooKeeper 的容量有限,因此,代码应该只发出元数据,而不在事务性 Spout 中发出批量数据。Because ZooKeeper has limited capacity, your code should emit only metadata and not bulk data in a transactional spout.

由于 Storm 自动重放失败的事务,因此,通常不会调用 FailBecause Storm automatically replays a failed transaction, Fail usually won't be called. 但是,如果 SCP 可以检查事务性 Spout 发出的元数据,当元数据无效时,SCP 可以调用 FailBut if SCP can check the metadata emitted by a transactional spout, it can call Fail when the metadata is invalid.

这些函数中的 parms 输入参数指定空字典,保留供以后使用。The parms input parameter in these functions specifies an empty dictionary and is reserved for future use.

ISCPBatchBoltISCPBatchBolt

ISCPBatchBolt 是事务性 Bolt 的接口。ISCPBatchBolt is the interface for a transactional bolt.

public interface ISCPBatchBolt : ISCPPlugin
{
    void Execute(SCPTuple tuple);
    void FinishBatch(Dictionary<string, Object> parms);  
}

当有新的元组抵达 Bolt 时,将调用 Execute 方法。The Execute method is called when a new tuple arrives at the bolt. 当此事务结束时,将调用 FinishBatch 方法。The FinishBatch method is called when this transaction ends. parms 输入参数保留供以后使用。The parms input parameter is reserved for future use.

对于事务性拓扑,StormTxAttempt 是重要的类。For a transactional topology, StormTxAttempt is an important class. 它包含两个成员:TxIdAttemptIdIt has two members: TxId and AttemptId. TxId 成员标识特定的事务。The TxId member identifies a specific transaction. 如果某个事务失败并重放,可以多次尝试该事务。A transaction might be attempted multiple times if it fails and is replayed.

SCP.NET 新建一个 ISCPBatchBolt 对象来处理每个 StormTxAttempt 对象,就像 Storm 在 Java 中所做的一样。SCP.NET creates a new ISCPBatchBolt object to process each StormTxAttempt object, just like what Storm does in Java. 这种设计的目的是支持并行事务处理。This design's purpose is to support parallel transaction processing. 事务尝试完成后,将销毁相应的 ISCPBatchBolt 对象并回收垃圾。After a transaction attempt is complete, the corresponding ISCPBatchBolt object is destroyed and garbage collected.

对象模型Object model

SCP.NET 还会提供可供开发人员用于编程的简单密钥对象集。SCP.NET also provides a simple set of key objects for developers to program with. 对象为 ContextStateStoreSCPRuntimeThe objects are Context, StateStore, and SCPRuntime. 本部分将介绍这些对象。They're discussed in this section.

上下文Context

Context 对象为应用程序提供运行环境。The Context object provides a running environment to an application. ISCPSpoutISCPBoltISCPTxSpoutISCPBatchBolt 的每个 ISCPPlugin 实例具有相应的 Context 实例。Each ISCPPlugin instance of ISCPSpout, ISCPBolt, ISCPTxSpout, or ISCPBatchBolt has a corresponding Context instance. Context 提供的功能可划分为以下两个部分:The functionality provided by Context is divided into these two parts:

  • 静态部分,可在整个 C# 进程中使用The static part, which is available in the whole C# process
  • 动态部分,仅可用于特定的 Context 实例。The dynamic part, which is available only for the specific Context instance

静态部分Static part

public static ILogger Logger = null;
public static SCPPluginType pluginType;
public static Config Config { get; set; }
public static TopologyContext TopologyContext { get; set; }  

提供 Logger 对象用于日志记录。The Logger object is provided for logging purposes.

pluginType 对象指示 C# 进程的插件类型。The pluginType object indicates the plug-in type of the C# process. 如果该进程在本地测试模式下运行(没有 Java),则插件类型为 SCP_NET_LOCALIf the process is run in local test mode without Java, the plug-in type is SCP_NET_LOCAL.

public enum SCPPluginType 
{
    SCP_NET_LOCAL = 0,
    SCP_NET_SPOUT = 1,
    SCP_NET_BOLT = 2,
    SCP_NET_TX_SPOUT = 3,
    SCP_NET_BATCH_BOLT = 4  
    }

Config 属性从 Java 端获取配置参数,初始化 C# 插件时,Java 端会传递这些参数。The Config property gets configuration parameters from the Java side, which passes them when a C# plug-in is initialized. Config 参数划分为两个部分:stormConfpluginConfThe Config parameters are divided into two parts: stormConf and pluginConf.

public Dictionary<string, Object> stormConf { get; set; }  
public Dictionary<string, Object> pluginConf { get; set; }  

stormConf 部分是 Storm 定义的参数,pluginConf 部分是 SCP 定义的参数。The stormConf part is parameters defined by Storm, and the pluginConf part is parameters defined by SCP. 下面是一个示例:Here's an example:

public class Constants
{
    … …

    // constant string for pluginConf
    public static readonly String NONTRANSACTIONAL_ENABLE_ACK = "nontransactional.ack.enabled";  

    // constant string for stormConf
    public static readonly String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";
    public static readonly String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";
}

TopologyContext 类型获取拓扑上下文。The TopologyContext type gets the topology context. 它非常适合用于多个并行组件。It's most useful for multiple parallel components. 下面是一个示例:Here's an example:

//demo how to get TopologyContext info
if (Context.pluginType != SCPPluginType.SCP_NET_LOCAL)
{
    Context.Logger.Info("TopologyContext info:");
    TopologyContext topologyContext = Context.TopologyContext;
    Context.Logger.Info("taskId: {0}", topologyContext.GetThisTaskId());
    taskIndex = topologyContext.GetThisTaskIndex();
    Context.Logger.Info("taskIndex: {0}", taskIndex);
    string componentId = topologyContext.GetThisComponentId();
    Context.Logger.Info("componentId: {0}", componentId);
    List<int> componentTasks = topologyContext.GetComponentTasks(componentId);  
    Context.Logger.Info("taskNum: {0}", componentTasks.Count);
}

动态部分Dynamic part

以下接口与特定的 Context 实例相关,该实例由 SCP.NET 平台创建,并传递给代码:The following interfaces are pertinent to a certain Context instance, which is created by the SCP.NET platform and passed to your code:

// Declare the Output and Input Stream Schemas

public void DeclareComponentSchema(ComponentStreamSchema schema);

// Emit tuple to default stream.
public abstract void Emit(List<object> values);

// Emit tuple to the specific stream.
public abstract void Emit(string streamId, List<object> values);  

对于支持确认的非事务性 Spout,提供了以下方法:For a nontransactional spout that supports acknowledgment, the following method is provided:

// for nontransactional spout that supports ack
public abstract void Emit(string streamId, List<object> values, long seqId);  

支持确认的非事务性 Bolt 应对其接收到的元组显式调用 AckFailA nontransactional bolt that supports acknowledgment should explicitly call Ack or Fail with the tuple it received. 发出新元组时,Bolt 还必须指定该元组的定位点。When emitting a new tuple, the bolt must also specify the tuple's anchors. 提供了以下方法:The following methods are provided:

public abstract void Emit(string streamId, IEnumerable<SCPTuple> anchors, List<object> values);
public abstract void Ack(SCPTuple tuple);
public abstract void Fail(SCPTuple tuple);

StateStoreStateStore

StateStore 对象提供元数据服务、单调序列生成和无等待协调。The StateStore object provides metadata services, monotonic sequence generation, and wait-free coordination. 可以在 StateStore 中构建更高级别的分布式并发抽象。You can build higher-level distributed concurrency abstractions on StateStore. 这些抽象包括分布式锁、分布式队列、屏障和事务服务。These abstractions include distributed locks, distributed queues, barriers, and transaction services.

SCP 应用程序可以使用 State 对象序列化 Apache ZooKeeper 中的信息。SCP applications can use the State object to serialize information in Apache ZooKeeper. 此功能对于事务性拓扑特别有用。This ability is especially valuable for a transactional topology. 如果某个事务性 Spout 停止响应并重启,State 可以从 ZooKeeper 检索所需信息并重启管道。If a transactional spout stops responding and restarts, State can retrieve the necessary information from ZooKeeper and restart the pipeline.

StateStore 对象具有以下主体方法:The StateStore object has these principal methods:

/// <summary>
/// Static method to retrieve a state store of the given path and connStr 
/// </summary>
/// <param name="storePath">StateStore path</param>
/// <param name="connStr">StateStore address</param>
/// <returns>Instance of StateStore</returns>
public static StateStore Get(string storePath, string connStr);

/// <summary>
/// Create a new state object in this state store instance
/// </summary>
/// <returns>State from StateStore</returns>
public State Create();

/// <summary>
/// Retrieve all states that were previously uncommitted, excluding all exited states
/// </summary>
/// <returns>Uncommitted states</returns>
public IEnumerable<State> GetUnCommitted();

/// <summary>
/// Get all the states in the StateStore
/// </summary>
/// <returns>All the states</returns>
public IEnumerable<State> States();

/// <summary>
/// Get state or registry object
/// </summary>
/// <param name="info">Registry name (registry only)</param>
/// <typeparam name="T">Type, registry or state</typeparam>
/// <returns>Return registry or state</returns>
public T Get<T>(string info = null);

/// <summary>
/// List all the committed states
/// </summary>
/// <returns>Registries containing the committed state </returns>
public IEnumerable<Registry> Committed();

/// <summary>
/// List all the exited states in the StateStore
/// </summary>
/// <returns>Registries containing the exited states</returns>
public IEnumerable<Registry> Aborted();

/// <summary>
/// Retrieve an existing state object from this state store instance 
/// </summary>
/// <returns>State from StateStore</returns>
/// <typeparam name="T">stateId, id of the State</typeparam>
public State GetState(long stateId)

State 对象具有以下主体方法:The State object has these principal methods:

/// <summary>
/// Set the status of the state object to commit
/// </summary>
public void Commit(bool simpleMode = true);

/// <summary>
/// Set the status of the state object to exit
/// </summary>
public void Abort();

/// <summary>
/// Put an attribute value under the given key
/// </summary>
/// <param name="key">Key</param>
/// <param name="attribute">State attribute</param>
    public void PutAttribute<T>(string key, T attribute);

/// <summary>
/// Get the attribute value associated with the given key
/// </summary>
/// <param name="key">Key</param>
/// <returns>State attribute</returns>
    public T GetAttribute<T>(string key);

simpleMode 设置为 true 时,Commit 方法将删除 ZooKeeper 中相应的 ZNode。When simpleMode is set to true, the Commit method deletes the corresponding ZNode in ZooKeeper. 否则,该方法将删除当前 ZNode,并在 COMMITTED_PATH 中添加新节点。Otherwise, the method deletes the current ZNode and adds a new node in the COMMITTED_PATH.

SCPRuntimeSCPRuntime

SCPRuntime 类提供以下两个方法:The SCPRuntime class provides the following two methods:

public static void Initialize();

public static void LaunchPlugin(newSCPPlugin createDelegate);  

Initialize 方法初始化 SCP 运行时环境。The Initialize method initializes the SCP runtime environment. 在此方法中,C# 进程连接到 Java 端,以获取配置参数和拓扑上下文。In this method, the C# process connects to the Java side to get configuration parameters and topology context.

LaunchPlugin 方法启动消息处理循环。The LaunchPlugin method starts the message-processing loop. 在此循环中,C# 插件从 Java 端接收消息。In this loop, the C# plug-in receives messages from the Java side. 这些消息包括元组和控制信号。These messages include tuples and control signals. 然后,该插件处理消息(也许是通过调用代码提供的接口方法进行处理)。The plug-in then processes the messages, perhaps by calling the interface method provided by your code.

LaunchPlugin 的输入参数是一个委托。The input parameter for LaunchPlugin is a delegate. 该方法可返回实现 ISCPSpoutISCPBoltISCPTxSpoutISCPBatchBolt 接口的对象。The method can return an object that implements the ISCPSpout, ISCPBolt, ISCPTxSpout, or ISCPBatchBolt interface.

public delegate ISCPPlugin newSCPPlugin(Context ctx, Dictionary<string, Object> parms);

对于 ISCPBatchBolt,可以从 parms 参数获取 StormTxAttempt 对象,并使用该对象来判断尝试是否为重放的尝试。For ISCPBatchBolt, you can get a StormTxAttempt object from the parms parameter and use it to judge whether the attempt is a replayed attempt. 检查重放尝试通常在提交 Bolt 中完成。The check for a replay attempt is often done at the commit bolt. 本文稍后的 HelloWorldTx 示例将演示此项检查。The HelloWorldTx example later in this article demonstrates this check.

SCP 插件通常可在两种模式下运行:本地测试模式和常规模式。SCP plug-ins can usually run in two modes: local test mode and regular mode.

本地测试模式Local test mode

在此模式下,C# 代码中的 SCP 插件在开发阶段将在 Visual Studio 中运行。In this mode, the SCP plug-ins in your C# code run inside Visual Studio during the development phase. 在此模式下可以使用 ILocalContext 接口。You can use the ILocalContext interface in this mode. 该接口提供所需的方法用于在本地文件中序列化发出的元组,并将其读回到 RAM 中。The interface provides methods to serialize the emitted tuples to local files and read them back into RAM.

public interface ILocalContext
{
    List<SCPTuple> RecvFromMsgQueue();
    void WriteMsgQueueToFile(string filepath, bool append = false);  
    void ReadFromFileToMsgQueue(string filepath);
}

常规模式Regular mode

在此模式下,Storm Java 进程将运行 SCP 插件。下面是一个示例:In this mode, the Storm Java process runs the SCP plug-ins. Here's an example:

namespace Scp.App.HelloWorld
{
public class Generator : ISCPSpout
{
    … …
    public static Generator Get(Context ctx, Dictionary<string, Object> parms)
    {
    return new Generator(ctx);
    }
}

class HelloWorld
{
    static void Main(string[] args)
    {
    /* Setting the environment variable here can change the log file name */
    System.Environment.SetEnvironmentVariable("microsoft.scp.logPrefix", "HelloWorld");

    SCPRuntime.Initialize();
    SCPRuntime.LaunchPlugin(new newSCPPlugin(Generator.Get));
    }
}
}

拓扑规范语言Topology Specification language

SCP 拓扑规范是一种特定于域的语言 (DSL),用于描述和配置 SCP 拓扑。SCP Topology Specification is a domain-specific language (DSL) for describing and configuring SCP topologies. 它基于 Storm 的 Clojure DSL 并通过 SCP 进行扩展。It's based on Storm's Clojure DSL and is extended by SCP.

可将拓扑规范直接提交到 Storm 群集,以通过 runSpec 命令执行。You can submit topology specifications directly to a Storm cluster for execution via the runSpec command.

SCP.NET 添加了以下函数来定义事务性拓扑:SCP.NET has added the following functions to define transactional topologies:

新建函数New function parametersParameters 说明Description
tx-topolopytx-topolopy topology-nametopology-name
spout-mapspout-map
bolt-mapbolt-map
使用拓扑名称、Spout 定义图和 Bolt 定义图来定义事务性拓扑。Defines a transactional topology with the topology name, spouts definition map, and bolts definition map.
scp-tx-spoutscp-tx-spout exec-nameexec-name
argsargs
fieldsfields
定义事务性 Spout。Defines a transactional spout. 该函数运行 exec-name 指定的应用程序,并使用 argsThe function runs the application that's specified by exec-name and uses args.

fields 参数指定 Spout 的输出字段。The fields parameter specifies the output fields for the spout.
scp-tx-batch-boltscp-tx-batch-bolt exec-nameexec-name
argsargs
fieldsfields
定义事务性批处理 Bolt。Defines a transactional batch bolt. 该函数运行 exec-name 指定的应用程序,并使用 argsThe function runs the application that's specified by exec-name and uses args.

fields 参数指定 Bolt 的输出字段。The fields parameter specifies the output fields for the bolt.
scp-tx-commit-boltscp-tx-commit-bolt exec-nameexec-name
argsargs
fieldsfields
定义事务性提交 Bolt。Defines a transactional commit bolt. 该函数运行 exec-name 指定的应用程序,并使用 argsThe function runs the application that's specified by exec-name and uses args.

fields 参数指定 Bolt 的输出字段。The fields parameter specifies the output fields for the bolt.
nontx-topologynontx-topology topology-nametopology-name
spout-mapspout-map
bolt-mapbolt-map
使用拓扑名称、Spout 定义图和 Bolt 定义图来定义非事务性拓扑。Defines a nontransactional topology with the topology name, spouts definition map, and bolts definition map.
scp-spoutscp-spout exec-nameexec-name
argsargs
fieldsfields
parametersparameters
定义非事务性 Spout。Defines a nontransactional spout. 该函数运行 exec-name 指定的应用程序,并使用 argsThe function runs the application that's specified by exec-name and uses args.

fields 参数指定 Spout 的输出字段。The fields parameter specifies the output fields for the spout.

parameters 参数是可选的。The parameters parameter is optional. 使用它可以指定参数,例如“nontransactional.ack.enabled”。Use it to specify parameters like "nontransactional.ack.enabled".
scp-boltscp-bolt exec-nameexec-name
argsargs
fieldsfields
parametersparameters
定义非事务性 Bolt。Defines a nontransactional bolt. 该函数运行 exec-name 指定的应用程序,并使用 argsThe function runs the application that's specified by exec-name and uses args.

fields 参数指定 Bolt 的输出字段The fields parameter specifies the output fields for the bolt

parameters 参数是可选的。The parameters parameter is optional. 使用它可以指定参数,例如“nontransactional.ack.enabled”。Use it to specify parameters like "nontransactional.ack.enabled".

SCP.NET 定义以下关键字:SCP.NET defines the following keywords:

关键字Keyword 说明Description
:name:name 拓扑名称The topology name
:topology:topology 使用上表中的函数和内置函数的拓扑The topology using the functions in the previous table and built-in functions
:p:p 每个 Spout 或 Bolt 的并行度提示The parallelism hint for each spout or bolt
:config:config 是要配置参数还是更新现有的参数Whether to configure parameters or update the existing ones
:schema:schema 流的架构The schema of the stream

SCP.NET 还定义以下常用参数:SCP.NET also defines these frequently used parameters:

参数Parameter 说明Description
"plugin.name""plugin.name" C# 插件的 .exe 文件名The .exe file name of the C# plug-in
"plugin.args""plugin.args" 插件参数The plug-in arguments
"output.schema""output.schema" 输出架构The output schema
"nontransactional.ack.enabled""nontransactional.ack.enabled" 是否为非事务性拓扑启用了确认Whether acknowledgment is enabled for a nontransactional topology

runSpec 命令连同位一起部署。The runSpec command is deployed together with the bits. 该命令的用法如下:Here is the command usage:

.\bin\runSpec.cmd
usage: runSpec [spec-file target-dir [resource-dir] [-cp classpath]]
ex: runSpec examples\HelloWorld\HelloWorld.spec specs examples\HelloWorld\Target

resource-dir 参数是可选的。The resource-dir parameter is optional. 若要插入 C# 应用程序,请指定此参数。Specify it when you want to plug in a C# application. 指定的目录包含应用程序、依赖项和配置。The specified directory contains the application, dependencies, and configurations.

classpath 参数也是可选参数 。The classpath parameter is also optional. 如果规范文件包含 Java Spout 或 Bolt,此参数将指定 Java 类路径。It specifies the Java classpath if the specification file contains a Java spout or bolt.

其他功能Miscellaneous features

输入和输出架构声明Input and output schema declarations

C# 进程可以发出元组。Your C# processes can emit tuples. 为此,平台会将元组序列化为 byte[] 对象,并将这些对象传输到 Java 端。To do so, the platform serializes tuples into byte[] objects and transfers the objects to the Java side. 然后,Storm 将这些元组传输到目标。Storm then transfers these tuples to the targets.

在下游组件中,C# 进程从 Java 端取回元组,并将这些元组转换为平台的原始类型。In downstream components, C# processes receive tuples back from the Java side and convert them to the platform's original types. 所有这些操作都在平台的后台进行。All these operations are hidden by the platform.

若要支持序列化和反序列化,代码需要声明输入和输出的架构。To support serialization and deserialization, your code needs to declare the schema of the input and output. 架构定义为字典。The schema is defined as a dictionary. 流 ID 是字典键。The stream ID is the dictionary key. 键值为列的类型。The key value is the types of the columns. 一个组件可以声明多个流。A component can declare multiple streams.

public class ComponentStreamSchema
{
    public Dictionary<string, List<Type>> InputStreamSchema { get; set; }
    public Dictionary<string, List<Type>> OutputStreamSchema { get; set; }
    public ComponentStreamSchema(Dictionary<string, List<Type>> input, Dictionary<string, List<Type>> output)
    {
        InputStreamSchema = input;
        OutputStreamSchema = output;
    }
}

以下函数添加到 Context 对象:The following function is added to a Context object:

public void DeclareComponentSchema(ComponentStreamSchema schema)

开发人员必须确保发出的元组遵从为流定义的架构。Developers must ensure that the emitted tuples obey the schema defined for a stream. 否则,系统会引发运行时异常。Otherwise, the system will throw a runtime exception.

多流支持Multistream support

SCP 允许代码同时向/从多个不同的流发出/接收数据。SCP lets your code emit to or receive from multiple distinct streams at the same time. Context 对象以 Emit 方法的可选流 ID 参数形式反映了此项支持。The Context object reflects this support as the Emit method's optional stream ID parameter.

在 SCP.NET Context 对象中添加了两个方法。Two methods in the SCP.NET Context object have been added. 这些方法将一个或多个元组发出到特定的流。They emit one or more tuples to specific streams. streamId 参数是一个字符串。The streamId parameter is a string. 在 C# 代码和拓扑定义规范中,其值必须相同。Its value must be the same in both C# code and the topology-definition specification.

/* Emit tuple to the specific stream. */
public abstract void Emit(string streamId, List<object> values);

/* for nontransactional spout only */
public abstract void Emit(string streamId, List<object> values, long seqId);

向不存在的流发出元组会导致运行时异常。Emitting to a nonexistent stream causes runtime exceptions.

字段分组Fields grouping

Storm 中内置的字段分组在 SCP.NET 中无法正常进行。The built-in fields grouping in Storm doesn't work properly in SCP.NET. 在 Java 代理端,所有字段的数据类型实际上都是 byte[]On the Java proxy side, the data type of all fields is actually byte[]. 字段分组使用 byte[] 对象的哈希代码来执行分组。The fields grouping uses the byte[] object's hash code to do the grouping. 哈希代码是此对象在 RAM 中的地址。The hash code is the address of this object in RAM. 因此,如果两个多字节对象共享相同的内容但地址不相同,分组将不正确。So the grouping will be wrong for multibyte objects that share the same content but not the same address.

SCP.NET 添加了一个自定义的分组方法,该方法使用 byte[] 对象的内容来执行分组。SCP.NET adds a customized grouping method, and it uses the content of the byte[] object to do the grouping. 在规范文件中,语法如以下示例所示:In a specification file, the syntax looks like this example:

(bolt-spec
    {
        "spout_test" (scp-field-group :non-tx [0,1])
    }
    …
)

在上述规范文件中:In the preceding specification file:

  • scp-field-group 指定分组是 SCP 实现的自定义字段分组。scp-field-group specifies that the grouping is a customized field grouping implemented by SCP.
  • :tx:non-tx 指定拓扑是否为事务性。:tx or :non-tx specifies whether the topology is transactional. 之所以需要此信息,因为事务性拓扑和非事务性拓扑的起始索引不同。You need this information because the starting index is different between transactional and nontransactional topologies.
  • [0,1] 指定从零开始的字段 ID 的哈希集。[0,1] specifies a hash set of field IDs that start with zero.

混合拓扑Hybrid topology

本机 Storm 代码是以 Java 编写的。Native Storm code is written in Java. SCP.NET 增强了 Storm,使你可以编写 C# 代码来处理业务逻辑。SCP.NET has enhanced Storm to let you write C# code to handle your business logic. 但是,SCP.NET 还支持混合拓扑,这种拓扑不仅包含 C# Spout/Bolt,还包含 Java Spout/Bolt。But SCP.NET also supports hybrid topologies, which contain not only C# spouts/bolts but also Java spouts/bolts.

在规范文件中指定 Java Spout/BoltSpecify Java spout/bolt in a specification file

可以在规范文件中使用 scp-spoutscp-bolt 来指定 Java Spout 和 Bolt。You can use scp-spout and scp-bolt in a specification file to specify Java spouts and bolts. 下面是一个示例:Here's an example:

(spout-spec 
  (microsoft.scp.example.HybridTopology.Generator.)
  :p 1)

此处,microsoft.scp.example.HybridTopology.Generator 是 Java Spout 类的名称。Here microsoft.scp.example.HybridTopology.Generator is the name of the Java spout class.

在 runSpec 命令中指定 Java 类路径Specify the Java classpath in a runSpec command

若要提交包含 Java Spout 或 Bolt 的拓扑,请先编译这些 Spout 或 Bolt 以生成 JAR 文件。If you want to submit topology that contains Java spouts or bolts, first compile them to produce JAR files. 然后,在提交拓扑时指定包含 JAR 文件的 Java 类路径。Then specify the java classpath that contains the JAR files when you submit topology. 下面是一个示例:Here's an example:

bin\runSpec.cmd examples\HybridTopology\HybridTopology.spec specs examples\HybridTopology\net\Target -cp examples\HybridTopology\java\target\*

此处,examples\HybridTopology\java\target\ 是包含 Java Spout/Bolt JAR 文件的文件夹。Here, examples\HybridTopology\java\target\ is the folder containing the Java spout/bolt JAR file.

Java 与 C# 之间的序列化和反序列化Serialization and deserialization between Java and C#

SCP 组件包括 Java 端和 C# 端。An SCP component includes the Java side and the C# side. 若要与本机 Java Spout/Bolt 交互,必须在 Java 端与 C# 端之间执行序列化/反序列化,如下图所示:To interact with native Java spouts/bolts, serialization and deserialization must occur between the Java side and the C# side, as illustrated in the following graph:

Java 组件向 SCP 组件发送数据,然后发送到不同 Java 组件的示意图

Java 端的序列化和 C# 端的反序列化Serialization in the Java side and deserialization in the C# side

首先,为 Java 端的序列化和 C# 端的反序列化提供默认实现。First provide the default implementation for serialization in the Java side and deserialization in the C# side.

在规范文件中指定 Java 端的序列化方法。Specify the Java side's serialization method in a specification file.

(scp-bolt
    {
        "plugin.name" "HybridTopology.exe"
        "plugin.args" ["displayer"]
        "output.schema" {}
        "customized.java.serializer" ["microsoft.scp.storm.multilang.CustomizedInteropJSONSerializer"]
    })

在 C# 代码中指定 C# 端的反序列化方法。Specify the C# side's deserialization method in your C# code.

Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
inputSchema.Add("default", new List<Type>() { typeof(Person) });
this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, null));
this.ctx.DeclareCustomizedDeserializer(new CustomizedInteropJSONDeserializer());

如果数据类型不是太复杂,此默认实现应该能够处理大多数情况。If the data type isn't too complex, this default implementation should handle most cases. 下面是可以插入自己的实现的情况:Here are cases where you can plug in your own implementation:

  • 数据类型过于复杂,默认实现无法对其进行处理。Your data type is too complex for the default implementation.
  • 默认实现的性能不符合要求。The performance of your default implementation doesn't meet your requirements.

Java 端的序列化接口定义为:The serialization interface in the Java side is defined as:

public interface ICustomizedInteropJavaSerializer {
    public void prepare(String[] args);
    public List<ByteBuffer> serialize(List<Object> objectList);
}

C# 端的反序列化接口定义为:The deserialization interface in the C# side is defined as:

public interface ICustomizedInteropCSharpDeserializer
{
    List<Object> Deserialize(List<byte[]> dataList, List<Type> targetTypes);
}

C# 端的序列化和 Java 端的反序列化Serialization in the C# side and deserialization in the Java side

在 C# 代码中指定 C# 端的序列化方法。Specify the C# side's serialization method in your C# code.

this.ctx.DeclareCustomizedSerializer(new CustomizedInteropJSONSerializer()); 

在规范文件中指定 Java 端的反序列化方法。Specify the Java side's deserialization method in a specification file.

(scp-spout
   {
     "plugin.name" "HybridTopology.exe"
     "plugin.args" ["generator"]
     "output.schema" {"default" ["person"]}
     "customized.java.deserializer" ["microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" "microsoft.scp.example.HybridTopology.Person"]
   }
)

此处,"microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" 是反序列化程序的名称,"microsoft.scp.example.HybridTopology.Person" 是数据反序列化成的目标类。Here, "microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" is the name of the deserializer, and "microsoft.scp.example.HybridTopology.Person" is the target class the data is deserialized to.

你也可以插入自己的 C# 序列化程序和 Java 反序列化程序的实现。You can also plug in your own implementation of a C# serializer and a Java deserializer.

以下代码是 C# 序列化程序的接口:This code is the interface for the C# serializer:

public interface ICustomizedInteropCSharpSerializer
{
    List<byte[]> Serialize(List<object> dataList);
}

以下代码是 Java 反序列化程序的接口:This code is the interface for the Java deserializer:

public interface ICustomizedInteropJavaDeserializer {
    public void prepare(String[] targetClassNames);
    public List<Object> Deserialize(List<ByteBuffer> dataList);
}

SCP 宿主模式SCP host mode

在此模式下,可将代码编译为 DLL,并使用 SCP 提供的 SCPHost.exe 来提交拓扑。In this mode, you can compile your code as a DLL and use SCPHost.exe as provided by SCP to submit a topology. 规范文件如以下代码所示:A specification file looks like this code:

(scp-spout
  {
    "plugin.name" "SCPHost.exe"
    "plugin.args" ["HelloWorld.dll" "Scp.App.HelloWorld.Generator" "Get"]
    "output.schema" {"default" ["sentence"]}
  })

此处,"plugin.name" 指定为 SCP SDK 提供的 "SCPHost.exe"Here, "plugin.name" is specified as "SCPHost.exe", which is provided by the SCP SDK. SCPHost.exe 按以下顺序接受三个参数:SCPHost.exe accepts three parameters in the following order:

  1. DLL 名称,在本示例中为 "HelloWorld.dll"The DLL name, which is "HelloWorld.dll" in this example.
  2. 类名,在本示例中为 "Scp.App.HelloWorld.Generator"The class name, which is "Scp.App.HelloWorld.Generator" in this example.
  3. 公共静态方法的名称,调用此方法可获取 ISCPPlugin 的实例。The name of a public static method, which can be invoked to get an instance of ISCPPlugin.

在宿主模式下,请将代码编译为 DLL,供 SCP 平台调用。In host mode, compile your code as a DLL for invocation by the SCP platform. 由于平台随后可以全面控制整个处理逻辑,因此我们建议客户在 SCP 宿主模式下提交拓扑。Because the platform can then get full control of the whole processing logic, we recommend you submit topology in SCP host mode. 这样可以简化开发体验。Doing so simplifies the development experience. 此外,可以获得更大的灵活性,并改善与将来版本的后向兼容性。It also brings you more flexibility and better backward compatibility for later releases.

SCP 编程示例SCP programming examples

HelloWorldHelloWorld

以下简单 HelloWorld 示例演示了 SCP.NET 的风格。The following simple HelloWorld example shows a taste of SCP.NET. 此示例使用一个非事务性拓扑,该拓扑包括一个名为 generator 的 Spout,以及分别名为 splittercounter 的两个 Bolt。It uses a nontransactional topology with a spout called generator and two bolts called splitter and counter. generator Spout 随机生成句子,并将这些句子发出到 splitterThe generator spout randomly generates sentences and emit these sentences to splitter. splitter Bolt 将句子拆分为字词,并将这些字词发出到 counter Bolt。The splitter bolt splits the sentences into words and emits these words to the counter bolt. counter Bolt 使用字典来记录每个字词出现的次数。The counter bolt uses a dictionary to record the occurrence of each word.

此示例具有两个规范文件:HelloWorld.spec 和 HelloWorld_EnableAck.spec。C# 代码可以通过从 Java 端获取 pluginConf 对象来确定是否已启用确认。This example has two specification files: HelloWorld.spec and HelloWorld_EnableAck.spec. The C# code can find out whether acknowledgment is enabled by getting the pluginConf object from the Java side.

/* demo how to get pluginConf info */
if (Context.Config.pluginConf.ContainsKey(Constants.NONTRANSACTIONAL_ENABLE_ACK))
{
    enableAck = (bool)(Context.Config.pluginConf[Constants.NONTRANSACTIONAL_ENABLE_ACK]);
}
Context.Logger.Info("enableAck: {0}", enableAck);

如果在 Spout 中启用了确认,字典将缓存未确认的元组。If acknowledgment is enabled in the spout, a dictionary caches the tuples that haven't been acknowledged. 如果调用 Fail,则会重放失败的元组。If Fail is called, the failed tuple is replayed.

public void Fail(long seqId, Dictionary<string, Object> parms)
{
    Context.Logger.Info("Fail, seqId: {0}", seqId);
    if (cachedTuples.ContainsKey(seqId))
    {
        /* get the cached tuple */
        string sentence = cachedTuples[seqId];

        /* replay the failed tuple */
        Context.Logger.Info("Re-Emit: {0}, seqId: {1}", sentence, seqId);
        this.ctx.Emit(Constants.DEFAULT_STREAM_ID, new Values(sentence), seqId);
    }
    else
    {
        Context.Logger.Warn("Fail(), can't find cached tuple for seqId {0}!", seqId);
    }
}

HelloWorldTxHelloWorldTx

以下 HelloWorldTx 示例演示如何实现事务性拓扑。The following HelloWorldTx example demonstrates how to implement transactional topology. 该示例包含一个名为 generator 的 Spout、一个名为 partial-count 的批处理 Bolt,以及一个名为 count-sum 的提交 Bolt。The example has one spout called generator, a batch bolt called partial-count, and a commit bolt called count-sum. 该示例还包含三个现有的文本文件:DataSource0.txt、DataSource1.txt 和 DataSource2.txt。The example also has three existing text files: DataSource0.txt, DataSource1.txt, and DataSource2.txt.

在每个事务中,generator Spout 从现有的三个文件中随机选择两个文件,并将这两个文件的名称发出到 partial-count Bolt。In each transaction, the generator spout randomly selects two files from the existing three files and emits the two file names to the partial-count bolt. partial-count Bolt:The partial-count bolt:

  1. 从收到的元组中获取文件名。Gets a file name from the received tuple.
  2. 打开相应的文件。Opens the corresponding file.
  3. 统计该文件中的字词数。Counts the number of words in the file.
  4. 将单词计数发出到 count-sum Bolt。Emits the word count to the count-sum bolt.

count-sum Bolt 对总计数进行汇总 。The count-sum bolt summarizes the total count.

若要实现“恰好一次”语义,count-sum 提交 Bolt 需要判断事务是否为重放的事务。To achieve exactly once semantics, the count-sum commit bolt needs to judge whether it's a replayed transaction. 在本示例中,此 Bolt 包含以下静态成员变量:In this example, it has the following static member variable:

public static long lastCommittedTxId = -1; 

创建 ISCPBatchBolt 实例后,它将从输入参数获取 txAttempt 对象的值。When an ISCPBatchBolt instance is created, it gets the value of the txAttempt object from input parameters.

public static CountSum Get(Context ctx, Dictionary<string, Object> parms)
{
    /* for transactional topology, we can get txAttempt from the input parms */
    if (parms.ContainsKey(Constants.STORM_TX_ATTEMPT))
    {
        StormTxAttempt txAttempt = (StormTxAttempt)parms[Constants.STORM_TX_ATTEMPT];
        return new CountSum(ctx, txAttempt);
    }
    else
    {
        throw new Exception("null txAttempt");
    }
}

调用 FinishBatch 时,将会更新 lastCommittedTxId(如果不是重放的事务)。When FinishBatch is called, lastCommittedTxId is updated if it isn't a replayed transaction.

public void FinishBatch(Dictionary<string, Object> parms)
{
    /* judge whether it is a replayed transaction */
    bool replay = (this.txAttempt.TxId <= lastCommittedTxId);

    if (!replay)
    {
        /* If it is not replayed, update the totalCount and lastCommittedTxId value */
        totalCount = totalCount + this.count;
        lastCommittedTxId = this.txAttempt.TxId;
    }
    … …
}

HybridTopologyHybridTopology

此拓扑包含一个 Java Spout 和一个 C# Bolt。This topology contains a Java spout and a C# bolt. 它使用 SCP 平台提供的默认序列化和反序列化实现。It uses the default serialization and deserialization implementation provided by the SCP platform. 有关规范文件的详细信息,请参阅 examples\HybridTopology 文件夹中的 HybridTopology.spec 文件。See the file HybridTopology.spec in the examples\HybridTopology folder for the specification file details. 有关如何指定 Java 类路径,另请参阅“SubmitTopology.bat”。Also see SubmitTopology.bat for how to specify the Java classpath.

SCPHostDemoSCPHostDemo

本示例在本质上与 HelloWorld 相同。This example is in essence the same as HelloWorld. 唯一不同之处在于,在本示例中,代码编译为 DLL,而拓扑是使用 SCPHost.exe 提交的。The only difference is that your code is compiled as a DLL and the topology is submitted by using SCPHost.exe. 有关更详细的说明,请参阅“SCP 宿主模式”部分。See the SCP host mode section for a more detailed explanation.

后续步骤Next steps

有关使用 SCP 创建的 Apache Storm 拓扑示例,请参阅以下文章:For examples of Apache Storm topologies created using SCP, see the following articles: