SCP 编程指南SCP programming guide

SCP 是一个用于构建实时、可靠、一致和高性能的数据处理应用程序的平台。SCP is a platform to build real time, reliable, consistent, and high-performance data processing application. 它在 Apache Storm 的基础上构建而成 -- Storm 是开源软件 (OSS) 社区设计的一个流处理系统。It is built on top of Apache Storm -- a stream processing system designed by the OSS communities. Storm 由 Nathan Marz 设计,在 Twitter 上进行开源。Storm is designed by Nathan Marz and was open sourced by Twitter. 其利用 Apache ZooKeeper(另一个 Apache 项目)来实现高可靠性的分布式协调和状态管理。It leverages Apache ZooKeeper, another Apache project to enable highly reliable distributed coordination and state management.

SCP 项目不仅已移植到 Windows 的 Storm 中,还为 Windows 生态系统增加了扩展和自定义。Not only the SCP project ported Storm on Windows but also the project added extensions and customization for the Windows ecosystem. 扩展包括 .NET 开发人员经验和库,自定义包括基于 Windows 的部署。The extensions include .NET developer experience, and libraries, the customization includes Windows-based deployment.

我们无需派生 OSS 项目便可实现扩展和自定义,而且我们还可以利用在 Storm 上构建的派生生态系统。The extension and customization is done in such a way that we do not need to fork the OSS projects and we could leverage derived ecosystems built on top of Storm.

处理模型Processing model

SCP 中的数据以连续的元组流形式建模。The data in SCP is modeled as continuous streams of tuples. 通常,元组首先流入某个队列,并被提取,再通过托管在 Storm 拓扑中的业务逻辑进行转换;最后,所产生的输出可作为元组传送到其他 SCP 系统,或者提交到存储(如分布式文件系统)或数据库(如 SQL Server)。Typically the tuples flow into some queue first, then picked up, and transformed by business logic hosted inside a Storm topology, finally the output could be piped as tuples to another SCP system, or be committed to stores like distributed file system or databases like SQL Server.

馈送待处理数据的队列(在数据存储中馈送数据)示意图

在 Storm 中,应用程序拓扑定义计算图。In Storm, an application topology defines a graph of computation. 拓扑中的每个节点都包含处理逻辑,节点之间的链接指明数据流。Each node in a topology contains processing logic, and links between nodes indicate data flow. 用于将输入数据注入到拓扑中的节点名为 Spout,这些节点还可排列数据。The nodes to inject input data into the topology are called spouts, which can be used to sequence the data. 输入数据可驻留在文件日志、事务性数据库、系统性能计数器中或其他位置。具有输入和输出数据流的节点名为 Bolt。,这些节点执行实际数据过滤、选择和汇总。The input data could reside in file logs, transactional database, system performance counter etc. The nodes with both input and output data flows are called bolts, which do the actual data filtering and selections and aggregation.

SCP 支持“尽力”、“至少一次”数据处理和“恰一次”数据处理。SCP supports best efforts, at-least-once and exactly-once data processing. 在分布式流处理应用程序中,数据处理过程中可能会出现各种错误,例如,网络中断、机器故障、用户代码错误等等。“至少一次”处理会在出现错误时自动重新处理原来的数据,从而确保所有数据均至少被处理一次。In a distributed streaming processing application, various errors may happen during data processing, such as network outage, machine failure, or user code error etc. At-least-once processing ensures all data will be processed at least once by replaying automatically the same data when error happens. “至少一次”处理简单且可靠,适用于许多应用程序。At-least-once processing is simple and reliable and suits well many applications. 但是,当应用程序需要确切计数时,进行“至少一次”处理是不够的,因为相同的数据有可能用于应用程序拓扑中。However, when an application requires exact counting, at-least-once processing is insufficient since the same data could potentially be played in the application topology. 在此情况下,“恰一次”处理可确保即使数据被多次重复使用和处理,结果也是正确。In that case, exactly-once processing is designed to make sure the result is correct even when the data may be replayed and processed multiple times.

通过 SCP,.NET 开发人员可以开发实时数据处理应用程序,同时在后台通过 Storm 利用 Java 虚拟机 (JVM)。SCP enables .NET developers to develop real time data process applications while leveraging on Java Virtual Machine (JVM) with Storm under the covers. .NET 和 JVM 通过 TCP 本地套接字进行通信。The .NET and JVM communicate via TCP local sockets. 基本上,每个 Spout/Bolt 都是一个 .Net/Java 进程对,在这些进程对中,用户逻辑作为插件在 .Net 进程中运行。Basically each Spout/Bolt is a .Net/Java process pair, where the user logic runs in .Net process as a plugin.

若要在 SCP 上开发数据处理应用程序,需要执行以下几个步骤:To build a data processing application on top of SCP, several steps are needed:

  • 设计并实施 Spout,以从队列中提取数据。Design and implement the Spouts to pull in data from queue.
  • 设计并实施 Bolt 以处理输入数据,以及将数据保存到外部存储(如数据库)中。Design and implement Bolts to process the input data, and save data to external stores such as a Database.
  • 设计拓扑,提交并运行拓扑。Design the topology, then submit and run the topology. 拓扑定义顶点以及顶点之间的数据流。The topology defines vertexes and the data flows between the vertexes. SCP 会采用拓扑规范并将拓扑部署在 Storm 群集上,在 Storm 群集中,每个顶点在一个逻辑节点上运行。SCP will take the topology specification and deploy it on a Storm cluster, where each vertex runs on one logical node. Storm 任务计划程序会处理故障转移和缩放。The failover and scaling will be taken care of by the Storm task scheduler.

本文档使用一些简单示例来介绍如何使用 SCP 开发数据处理应用程序。This document uses some simple examples to walk through how to build data processing application with SCP.

SCP 插件接口SCP Plugin Interface

SCP 插件(或应用程序)是独立的 EXE,可以在开发阶段在 Visual Studio 中运行,也可以在生产环境中部署后插入到 Storm 管道中。SCP plugins (or applications) are standalone EXEs that can both run inside Visual Studio during the development phase, and be plugged into the Storm pipeline after deployment in production. 编写 SCP 插件就像编写任何其他标准的 Windows 控制台应用程序一样。Writing the SCP plugin is just the same as writing any other standard Windows console applications. SCP.NET 平台声明用于 Spout/Bolt 的接口,用户插件代码应实施这些接口。SCP.NET platform declares some interface for spout/bolt, and the user plugin code should implement these interfaces. 这种设计的主要目的是使用户可以专注于自己的业务逻辑,其他事情则由 SCP.NET 平台处理。The main purpose of this design is that the user can focus on their own business logics, and leaving other things to be handled by SCP.NET platform.

用户插件代码应实施以下其中一个接口,具体取决于拓扑是事务性还是非事务性,以及组件是 Spout 还是 Bolt。The user plugin code should implement one of the followings interfaces, depends on whether the topology is transactional or non-transactional, and whether the component is a spout or bolt.

  • ISCPSpoutISCPSpout
  • ISCPBoltISCPBolt
  • ISCPTxSpoutISCPTxSpout
  • ISCPBatchBoltISCPBatchBolt

ISCPPluginISCPPlugin

ISCPPlugin 是各种插件的公共接口。ISCPPlugin is the common interface for all kinds of plugins. 目前,该接口是一个虚拟接口。Currently, it is a dummy interface.

public interface ISCPPlugin 
{
}

ISCPSpoutISCPSpout

ISCPSpout 是适用于非事务性 Spout 的接口。ISCPSpout is the interface for non-transactional spout.

 public interface ISCPSpout : ISCPPlugin                    
 {
     void NextTuple(Dictionary<string, Object> parms);         
     void Ack(long seqId, Dictionary<string, Object> parms);   
     void Fail(long seqId, Dictionary<string, Object> parms);  
 }

调用 NextTuple() 时,C# 用户代码会发送一个或多个元组。When NextTuple() is called, the C# user code can emit one or more tuples. 如果没有要发送的数据,此方法应返回而不发送任何信息。If there is nothing to emit, this method should return without emitting anything. 请注意,如果 C# 进程的单一线程出现紧凑循环,将会调用 NextTuple()Ack()Fail()It should be noted that NextTuple(), Ack(), and Fail() are all called in a tight loop in a single thread in C# process. 如果没有要发送的元组,最好短暂地将 NextTuple 置于休眠状态(例如 10 毫秒),以免浪费太多 CPU。When there are no tuples to emit, it is courteous to have NextTuple sleep for a short amount of time (such as 10 milliseconds) so as not to waste too much CPU.

仅在规范文件中启用了确认机制的情况下,才会调用 Ack()Fail()Ack() and Fail() are called only when ack mechanism is enabled in spec file. seqId 用于识别已确认或失败的元组。The seqId is used to identify the tuple that is acknowledged or failed. 因此,如果在非事务性拓扑中启用了确认功能,应在 Spout 中使用以下 emit 函数:So if ack is enabled in non-transactional topology, the following emit function should be used in Spout:

public abstract void Emit(string streamId, List<object> values, long seqId); 

如果非事务性拓扑不支持确认功能,则可以将 Ack()Fail() 保留为空函数。If ack is not supported in non-transactional topology, the Ack() and Fail() can be left as empty function.

这些函数中的 parms 输入参数是空字典,保留供以后使用。The parms input parameter in these functions is an empty Dictionary, it is reserved for future use.

ISCPBoltISCPBolt

ISCPBolt 是适用于非事务性 Bolt 的接口。ISCPBolt is the interface for non-transactional bolt.

public interface ISCPBolt : ISCPPlugin 
{
void Execute(SCPTuple tuple);           
}

有新的元组可用时,会调用 Execute() 函数来处理该元组。When new tuple is available, the Execute() function is called to process it.

ISCPTxSpoutISCPTxSpout

ISCPTxSpout 是适用于事务性 Spout 的接口。ISCPTxSpout is the interface for transactional spout.

public interface ISCPTxSpout : ISCPPlugin
{
    void NextTx(out long seqId, Dictionary<string, Object> parms);  
    void Ack(long seqId, Dictionary<string, Object> parms);         
    void Fail(long seqId, Dictionary<string, Object> parms);        
}

与非事务性接口一样,如果 C# 进程的单一线程出现紧凑循环,将会调用 NextTx()Ack()Fail()Just like their non-transactional counter-part, NextTx(), Ack(), and Fail() are all called in a tight loop in a single thread in C# process. 如果没有要发送的数据,最好短暂地将 NextTx 置于休眠状态(例如 10 毫秒),以免浪费太多 CPU。When there are no data to emit, it is courteous to have NextTx sleep for a short amount of time (10 milliseconds) so as not to waste too much CPU.

调用 NextTx() 可启动新的事务;输出参数 seqId 用于识别事务,该参数也用于 Ack()Fail() 中。NextTx() is called to start a new transaction, the out parameter seqId is used to identify the transaction, which is also used in Ack() and Fail(). NextTx()中,用户可以将数据发送到 Java 端。In NextTx(), user can emit data to Java side. 然后,数据会被存储在 ZooKeeper 中,以支持重用。The data is stored in ZooKeeper to support replay. ZooKeeper 的容量有限,因此,用户应该只发送元数据,而不应该发送事务性 Spout 中的批量数据。Because the capacity of ZooKeeper is limited, user should only emit metadata, not bulk data in transactional spout.

如果事务失败,Storm 会自动重新处理失败的事务,因此,一般情况下不应调用 Fail()Storm will replay a transaction automatically if it fails, so Fail() should not be called in normal case. 但是,如果 SCP 可以检查事务性 Spout 发送的元数据,当元数据无效时,SCP 可以调用 Fail()But if SCP can check the metadata emitted by transactional spout, it can call Fail() when the metadata is invalid.

这些函数中的 parms 输入参数是空字典,保留供以后使用。The parms input parameter in these functions is an empty Dictionary, it is reserved for future use.

ISCPBatchBoltISCPBatchBolt

ISCPBatchBolt 是适用于事务性 Bolt 的接口。ISCPBatchBolt is the interface for transactional bolt.

public interface ISCPBatchBolt : ISCPPlugin           
{
    void Execute(SCPTuple tuple);
    void FinishBatch(Dictionary<string, Object> parms);  
}

Execute()Execute() is called when there is new tuple arriving at the bolt. FinishBatch()FinishBatch() is called when this transaction is ended. 保留 parms 输入参数供以后使用。The parms input parameter is reserved for future use.

对于事务性拓扑,有一个重要概念 - StormTxAttemptFor transactional topology, there is an important concept – StormTxAttempt. 它有两个字段:TxIdAttemptIdIt has two fields, TxId and AttemptId. TxId 用于识别特定事务,对于给定的事务,如果事务失败,可能会多次尝试对事务进行重新处理。TxId is used to identify a specific transaction, and for a given transaction, there may be multiple attempts if the transaction fails and is replayed. SCP.NET 新建一个 ISCPBatchBolt 对象来处理每个 StormTxAttempt,就像 Storm 在 Java 中所做的一样。SCP.NET creates a new ISCPBatchBolt object to process each StormTxAttempt, just like what Storm does in Java. 这种设计的目的是支持并行事务处理。The purpose of this design is to support parallel transactions processing. 用户应记住,事务处理尝试结束后,应销毁相应的 ISCPBatchBolt 对象并回收所产生的垃圾。User should keep it in mind that if transaction attempt is finished, the corresponding ISCPBatchBolt object is destroyed and garbage collected.

对象模型Object Model

SCP.NET 还会提供可供开发人员用于编程的简单密钥对象集。SCP.NET also provides a simple set of key objects for developers to program with. 这些对象是 Context、StateStore 和 SCPRuntime。They are Context, StateStore, and SCPRuntime. 本节余下部分会讨论这些对象。They are discussed in the rest part of this section.

上下文Context

上下文为应用程序提供运行环境。Context provides a running environment to the application. 每个 ISCPPlugin 实例 (ISCPSpout/ISCPBolt/ISCPTxSpout/ISCPBatchBolt) 都有相应的上下文实例。Each ISCPPlugin instance (ISCPSpout/ISCPBolt/ISCPTxSpout/ISCPBatchBolt) has a corresponding Context instance. 上下文提供的功能可分为两部分:(1) 静态部分,可用于整个 C# 进程;(2) 动态部分,仅可用于特定上下文实例。The functionality provided by Context can be divided into two parts: (1) the static part, which is available in the whole C# process, (2) the dynamic part, which is only available for the specific Context instance.

静态部分Static Part

public static ILogger Logger = null;
public static SCPPluginType pluginType;                      
public static Config Config { get; set; }                    
public static TopologyContext TopologyContext { get; set; }  

Logger 用于日志记录。Logger is provided for log purpose.

pluginType 用于指明 C# 进程的插件类型。pluginType is used to indicate the plugin type of the C# process. 如果 C# 进程在本地测试模式下运行(没有 Java),则插件类型是 SCP_NET_LOCALIf the C# process is run in local test mode (without Java), the plugin type is SCP_NET_LOCAL.

public enum SCPPluginType 
{
    SCP_NET_LOCAL = 0,       
    SCP_NET_SPOUT = 1,       
    SCP_NET_BOLT = 2,        
    SCP_NET_TX_SPOUT = 3,   
    SCP_NET_BATCH_BOLT = 4  
}

Config 用于从 Java 端获取配置参数。Config is provided to get configuration parameters from Java side. C# 插件初始化时,会从 Java 端传递这些参数。The parameters are passed from Java side when C# plugin is initialized. Config 参数分为两部分:stormConfpluginConfThe Config parameters are divided into two parts: stormConf and pluginConf.

public Dictionary<string, Object> stormConf { get; set; }  
public Dictionary<string, Object> pluginConf { get; set; }  

stormConf 是 Storm 定义的参数,pluginConf 是 SCP 定义的参数。stormConf is parameters defined by Storm and pluginConf is the parameters defined by SCP. 例如:For example:

public class Constants
{
    … …

    // constant string for pluginConf
    public static readonly String NONTRANSACTIONAL_ENABLE_ACK = "nontransactional.ack.enabled";  

    // constant string for stormConf
    public static readonly String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";           
    public static readonly String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";                 
}

TopologyContext 用于获取拓扑上下文,对于具有多种并行执行能力的组件最有用。TopologyContext is provided to get the topology context, it is most useful for components with multiple parallelism. 以下是示例:Here is an example:

//demo how to get TopologyContext info
if (Context.pluginType != SCPPluginType.SCP_NET_LOCAL)                      
{
    Context.Logger.Info("TopologyContext info:");
    TopologyContext topologyContext = Context.TopologyContext;                    
    Context.Logger.Info("taskId: {0}", topologyContext.GetThisTaskId());          
    taskIndex = topologyContext.GetThisTaskIndex();
    Context.Logger.Info("taskIndex: {0}", taskIndex);
    string componentId = topologyContext.GetThisComponentId();                    
    Context.Logger.Info("componentId: {0}", componentId);
    List<int> componentTasks = topologyContext.GetComponentTasks(componentId);  
    Context.Logger.Info("taskNum: {0}", componentTasks.Count);                    
}

动态部分Dynamic Part

以下接口与某个上下文实例相关。The following interfaces are pertinent to a certain Context instance. 上下文实例由 SCP.NET 平台创建,会被传递到用户代码:The Context instance is created by SCP.NET platform and passed to the user code:

// Declare the Output and Input Stream Schemas

public void DeclareComponentSchema(ComponentStreamSchema schema);   

// Emit tuple to default stream.
public abstract void Emit(List<object> values);                   

// Emit tuple to the specific stream.
public abstract void Emit(string streamId, List<object> values);  

对于支持确认功能的非事务性 Spout,提供了以下方法:For non-transactional spout supporting ack, the following method is provided:

// for non-transactional Spout which supports ack
public abstract void Emit(string streamId, List<object> values, long seqId);  

对于支持确认功能的非事务性 Bolt,应对其接收到的元组明确执行 Ack()Fail()For non-transactional bolt supporting ack, it should explicitly Ack() or Fail() the tuple it received. 发送新元组时,还必须指定新元组的定位点。And when emitting new tuple, it must also specify the anchors of the new tuple. 对于此类 Bolt,提供了以下方法。The following methods are provided.

public abstract void Emit(string streamId, IEnumerable<SCPTuple> anchors, List<object> values); 
public abstract void Ack(SCPTuple tuple);
public abstract void Fail(SCPTuple tuple);

StateStoreStateStore

StateStore 提供元数据服务、单调序列生成和无等待协调。StateStore provides metadata services, monotonic sequence generation, and wait-free coordination. 可以在 StateStore中构建高级分布式并发抽象,包括分布式锁、分布式队列、屏障和事务服务。Higher-level distributed concurrency abstractions can be built on StateStore, including distributed locks, distributed queues, barriers, and transaction services.

SCP 应用程序可使用 State 对象在 Apache ZooKeeper 中保留某些信息,尤其是适用于事务性拓扑的信息。SCP applications may use the State object to persist some information in Apache ZooKeeper, especially for transactional topology. 这样做时,如果事务性 Spout 崩溃并重新启动,它可以从 ZooKeeper 检索必要信息并重新开始数据传输。Doing so, if transactional spout crashes and restart, it can retrieve the necessary information from ZooKeeper and restart the pipeline.

StateStore 对象主要提供以下方法:The StateStore object mainly has these methods:

/// <summary>
/// Static method to retrieve a state store of the given path and connStr 
/// </summary>
/// <param name="storePath">StateStore Path</param>
/// <param name="connStr">StateStore Address</param>
/// <returns>Instance of StateStore</returns>
public static StateStore Get(string storePath, string connStr);

/// <summary>
/// Create a new state object in this state store instance
/// </summary>
/// <returns>State from StateStore</returns>
public State Create();

/// <summary>
/// Retrieve all states that were previously uncommitted, excluding all aborted states 
/// </summary>
/// <returns>Uncommitted States</returns>
public IEnumerable<State> GetUnCommitted();

/// <summary>
/// Get all the States in the StateStore
/// </summary>
/// <returns>All the States</returns>
public IEnumerable<State> States();

/// <summary>
/// Get state or registry object
/// </summary>
/// <param name="info">Registry Name(Registry only)</param>
/// <typeparam name="T">Type, Registry or State</typeparam>
/// <returns>Return Registry or State</returns>
public T Get<T>(string info = null);

/// <summary>
/// List all the committed states
/// </summary>
/// <returns>Registries contain the Committed State </returns> 
public IEnumerable<Registry> Committed();

/// <summary>
/// List all the Aborted State in the StateStore
/// </summary>
/// <returns>Registries contain the Aborted State</returns>
public IEnumerable<Registry> Aborted();

/// <summary>
/// Retrieve an existing state object from this state store instance 
/// </summary>
/// <returns>State from StateStore</returns>
/// <typeparam name="T">stateId, id of the State</typeparam>
public State GetState(long stateId)

State 对象主要提供以下方法:The State object mainly has these methods:

/// <summary>
/// Set the status of the state object to commit 
/// </summary>
public void Commit(bool simpleMode = true); 

/// <summary>
/// Set the status of the state object to abort 
/// </summary>
public void Abort();

/// <summary>
/// Put an attribute value under the give key 
/// </summary>
/// <param name="key">Key</param> 
/// <param name="attribute">State Attribute</param> 
public void PutAttribute<T>(string key, T attribute); 

/// <summary>
/// Get the attribute value associated with the given key 
/// </summary>
/// <param name="key">Key</param> 
/// <returns>State Attribute</returns>               
public T GetAttribute<T>(string key);                    

对于 Commit() 方法,如果 simpleMode 设置为 true,此方法会删除 ZooKeeper 中相应的 ZNode。For the Commit() method, when simpleMode is set to true, it deletes the corresponding ZNode in ZooKeeper. 否则,会删除当前 ZNode 并在 COMMITTED_PATH 中添加新节点。Otherwise, it deletes the current ZNode, and adding a new node in the COMMITTED_PATH.

SCPRuntimeSCPRuntime

SCPRuntime 提供以下两种方法:SCPRuntime provides the following two methods:

public static void Initialize();

public static void LaunchPlugin(newSCPPlugin createDelegate);  

Initialize() 用于初始化 SCP 运行时环境。Initialize() is used to initialize the SCP runtime environment. 在此方法中,C# 进程会连接到 Java 端,并会获取配置参数和拓扑上下文。In this method, the C# process connects to the Java side, and gets configuration parameters and topology context.

LaunchPlugin() 用于启动消息处理循环。LaunchPlugin() is used to kick off the message processing loop. 在此循环中,C# 插件接收来自 Java 端的消息(包括元组和控制信号),然后处理接收到的消息,也许还会调用用户代码提供的接口方法。In this loop, the C# plugin receives messages form Java side (including tuples and control signals), and then process the messages, perhaps calling the interface method provide by the user code. LaunchPlugin() 方法的输入参数是委托参数,可返回实施 ISCPSpout/IScpBolt/ISCPTxSpout/ISCPBatchBolt 接口的对象。The input parameter for method LaunchPlugin() is a delegate that can return an object that implement ISCPSpout/IScpBolt/ISCPTxSpout/ISCPBatchBolt interface.

public delegate ISCPPlugin newSCPPlugin(Context ctx, Dictionary\<string, Object\> parms); 

对于 ISCPBatchBolt,可从 parms 获取 StormTxAttempt,并利用它来判断某项操作是否是重复尝试。For ISCPBatchBolt, we can get StormTxAttempt from parms, and use it to judge whether it is a replayed attempt. 对尝试重播的检查通常在 Commit Bolt 中完成,并在 HelloWorldTx 示例中进行演示。The check for a replay attempt is often done at the commit bolt, and it is demonstrated in the HelloWorldTx example.

一般来说,SCP 插件可在以下两种模式下运行:Generally speaking, the SCP plugins may run in two modes here:

  1. 本地测试模式:在此模式下,在开发阶段中,SCP 插件(C# 用户代码)在 Visual Studio 中运行。Local Test Mode: In this mode, the SCP plugins (the C# user code) run inside Visual Studio during the development phase. LocalContext 可用于此模式,其提供方法对发送到本地文件的元组进行序列化并将这些元组重新读取到内存。LocalContext can be used in this mode, which provides method to serialize the emitted tuples to local files, and read them back to memory.

     public interface ILocalContext
     {
         List\<SCPTuple\> RecvFromMsgQueue();
         void WriteMsgQueueToFile(string filepath, bool append = false);  
         void ReadFromFileToMsgQueue(string filepath);                    
     }
    
  2. 常规模式:在此模式下,SCP 插件由 Storm Java 进程启动。Regular Mode: In this mode, the SCP plugins are launched by storm java process.

    下面是一个 SCP 插件启动示例:Here is an example of launching SCP plugin:

     namespace Scp.App.HelloWorld
     {
     public class Generator : ISCPSpout
     {
         … …
         public static Generator Get(Context ctx, Dictionary<string, Object> parms)
         {
         return new Generator(ctx);
         }
     }
    
     class HelloWorld
     {
         static void Main(string[] args)
         {
         /* Setting the environment variable here can change the log file name */
         System.Environment.SetEnvironmentVariable("microsoft.scp.logPrefix", "HelloWorld");
    
         SCPRuntime.Initialize();
         SCPRuntime.LaunchPlugin(new newSCPPlugin(Generator.Get));
         }
     }
     }
    

拓扑规范语言Topology Specification Language

SCP 拓扑规范是一种特定于域的语言,用于描述和配置 SCP 拓扑。SCP Topology Specification is a domain-specific language for describing and configuring SCP topologies. 它基于 Storm 的 Clojure DSL (https://storm.incubator.apache.org/documentation/Clojure-DSL.html) 并通过 SCP 进行扩展。It is based on Storm's Clojure DSL (https://storm.incubator.apache.org/documentation/Clojure-DSL.html) and is extended by SCP.

拓扑规范可通过 runspec 命令直接提交到 Storm 群集进行执行。Topology specifications can be submitted directly to storm cluster for execution via the runspec command.

SCP.NET 添加了以下函数来定义事务性拓扑:SCP.NET has added the following functions to define Transactional Topologies:

新函数New Functions ParametersParameters 说明Description
tx-topolopytx-topolopy topology-nametopology-name
spout-mapspout-map
bolt-mapbolt-map
使用拓扑名称、Spout 定义图和 Bolt 定义图来定义事务性拓扑 Define a transactional topology with the topology name,  spouts definition map and the bolts definition map
scp-tx-spoutscp-tx-spout exec-nameexec-name
argsargs
fieldsfields
定义事务性 Spout。Define a transactional spout. 使用 args 运行带有 exec-name 的应用程序。It runs the application with exec-name using args.

fields 是用于 Spout 的输出字段The fields is the Output Fields for spout
scp-tx-batch-boltscp-tx-batch-bolt exec-nameexec-name
argsargs
fieldsfields
定义事务性批处理 Bolt。Define a transactional Batch Bolt. 使用 args 运行带有 exec-name 的应用程序。It runs the application with exec-name using args.

Fields 是用于 Bolt 的输出字段。The Fields is the Output Fields for bolt.
scp-tx-commit-boltscp-tx-commit-bolt exec-nameexec-name
argsargs
fieldsfields
定义事务性 Committer Bolt。Define a transactional commit bolt. 使用 args 运行带有 exec-name 的应用程序。It runs the application with exec-name using args.

fields 是用于 Bolt 的输出字段The fields is the Output Fields for bolt
nontx-topolopynontx-topolopy topology-nametopology-name
spout-mapspout-map
bolt-mapbolt-map
使用拓扑名称、Spout 定义图和 Bolt 来定义非事务性拓扑 Define a nontransactional topology with the topology name,  spouts definition map and the bolts definition map
scp-spoutscp-spout exec-nameexec-name
argsargs
fieldsfields
参数parameters
定义非事务性 Spout。Define a nontransactional spout. 使用 args 运行带有 exec-name 的应用程序。It runs the application with exec-name using args.

fields 是用于 Spout 的输出字段The fields is the Output Fields for spout

parameters 为可选,可使用它指定某些参数,例如“nontransactional.ack.enabled”。The parameters are optional, using it to specify some parameters such as "nontransactional.ack.enabled".
scp-boltscp-bolt exec-nameexec-name
argsargs
fieldsfields
参数parameters
定义非事务性 Bolt。Define a nontransactional Bolt. 使用 args 运行带有 exec-name 的应用程序。It runs the application with exec-name using args.

fields 是用于 Bolt 的输出字段The fields is the Output Fields for bolt

parameters 为可选,可使用它指定某些参数,例如“nontransactional.ack.enabled”。The parameters are optional, using it to specify some parameters such as "nontransactional.ack.enabled".

SCP.NET 定义了以下关键字:SCP.NET has the following keywords defined:

关键字Key Words 说明Description
:name:name 定义拓扑名称Define the Topology Name
:topology:topology 使用上述函数和内置函数定义拓扑。Define the Topology using the previous functions and build in ones.
:p:p 定义每个 Spout 或 Bolt 的并行提示。Define the parallelism hint for each spout or bolt.
:config:config 定义配置参数或更新现有的配置参数Define configure parameter or update the existing ones
:schema:schema 定义流架构。Define the Schema of Stream.

常用参数:And frequently used parameters:

参数Parameter 说明Description
"plugin.name""plugin.name" C# 插件的 exe 文件名exe file name of the C# plugin
"plugin.args""plugin.args" plugin argsplugin args
"output.schema""output.schema" 输出架构Output schema
"nontransactional.ack.enabled""nontransactional.ack.enabled" 非事务性拓扑是否已启用确认功能Whether ack is enabled for nontransactional topology

runspec 命令会与位元一起部署,其用法如下:The runspec command is deployed together with the bits, the usage is like:

.\bin\runSpec.cmd
usage: runSpec [spec-file target-dir [resource-dir] [-cp classpath]]
ex: runSpec examples\HelloWorld\HelloWorld.spec specs examples\HelloWorld\Target

resource-dir 参数是可选参数,在想要启动 C# 应用程序时,需要指定此参数,此目录将包含应用程序、依赖项和配置。The resource-dir parameter is optional, you need to specify it when you want to plug a C# application, and this directory contains the application, the dependencies, and configurations.

classpath 参数也是可选参数。The classpath parameter is also optional. 当规范文件包含 Java Spout 或 Bolt 时,此参数用于指定 Java classpath。It is used to specify the Java classpath if the spec file contains Java Spout or Bolt.

其他功能Miscellaneous Features

输入和输出架构声明Input and Output Schema Declaration

用户可以在 C# 进程中发送元组;平台需要将元组序列化为 byte[],传输到 Java 端;而 Storm 会将此元组传输到目标。Users can emit tuples in C# processes, the platform needs to serialize the tuple into byte[], transfer to Java side, and Storm will transfer this tuple to the targets. 与此同时,在下游组件中,C# 进程会接收从 Java 端返回的元组,并将其转换为平台使用的原始类型 - 所有这些操作都在平台的后台进行。Meanwhile in downstream components, C# processes will receive tuples back from java side, and convert it to the original types by platform, all these operations are hidden by the Platform.

为了支持序列化和反序列化,用户代码需要声明输入和输出的架构。To support the serialization and deserialization, user code needs to declare the schema of the inputs and outputs.

输入/输出流架构被定义为字典。The input/output stream schema is defined as a dictionary. 键为 StreamId。The key is the StreamId. 值为列的类型。The value is the Types of the columns. 组件可以声明多个数据流。The component can have multi-streams declared.

public class ComponentStreamSchema
{
    public Dictionary<string, List<Type>> InputStreamSchema { get; set; }
    public Dictionary<string, List<Type>> OutputStreamSchema { get; set; }
    public ComponentStreamSchema(Dictionary<string, List<Type>> input, Dictionary<string, List<Type>> output)
    {
        InputStreamSchema = input;
        OutputStreamSchema = output;
    }
}

在上下文对象中,添加了以下 API:In Context object, we have the following API added:

public void DeclareComponentSchema(ComponentStreamSchema schema)

开发者必须确保发送的元组符合为该流定义的架构,否则,系统会引发运行时异常。Developers must ensure that the tuples emitted obey the schema defined for that stream, otherwise the system will throw a runtime exception.

多流支持Multi-Stream Support

SCP 支持用户代码同时向多个不同数据流发送元组或同时接收来自多个不同数据流的元组。SCP supports user code to emit or receive from multiple distinct streams at the same time. 这种支持在上下文对象中体现为,Emit 方法采用可选的 stream ID 参数。The support reflects in the Context object as the Emit method takes an optional stream ID parameter.

在 SCP.NET 上下文对象中添加了两种方法。Two methods in the SCP.NET Context object have been added. 这些方法用于发送一个或多个元组以指定 StreamId。They are used to emit Tuple or Tuples to specify StreamId. StreamId 是字符串,必须在 C# 和拓扑定义规范中保持一致。The StreamId is a string and it needs to be consistent in both C# and the Topology Definition Spec.

    /* Emit tuple to the specific stream. */
    public abstract void Emit(string streamId, List<object> values);

    /* for non-transactional Spout only */
    public abstract void Emit(string streamId, List<object> values, long seqId);

向不存在的数据流发送元组会导致运行时异常。The emitting to a non-existing stream causes runtime exceptions.

字段分组Fields Grouping

Storm 中内置的字段分组在 SCP.NET 中无法正常使用。The built-in Fields Grouping in Storm is not working properly in SCP.NET. 在 Java 代理端,所有字段数据类型实际上都是 byte[],字段分组会使用 byte[] 对象来进行分组。On the Java Proxy side, all the fields data types are actually byte[], and the fields grouping uses the byte[] object hash code to perform the grouping. byte[] 对象哈希代码是该对象在内存中的地址。The byte[] object hash code is the address of this object in memory. 因此,如果两个 byte[] 对象共享相同的内容但地址不相同,分组将会不正确。So the grouping will be wrong for two byte[] objects that share the same content but not the same address.

SCP.NET 添加了一个自定义的分组方法,该方法会使用 byte[] 的内容来进行分组。SCP.NET adds a customized grouping method, and it uses the content of the byte[] to do the grouping. SPEC 文件中,语法如下所示:In SPEC file, the syntax is like:

(bolt-spec
    {
        "spout_test" (scp-field-group :non-tx [0,1])
    }
    …
)

其中:Here,

  1. "scp-field-group" 表示“SCP 实现的自定义字段分组”。"scp-field-group" means "Customized field grouping implemented by SCP".
  2. ":tx" 或 ":non-tx" 表示是否是事务性拓扑。":tx" or ":non-tx" means if it's transactional topology. 我们需要此信息,因为事务性拓扑和非事务性拓扑的起始索引不一样。We need this information since the starting index is different in tx vs. non-tx topologies.
  3. [0,1] 表示从 0 开始的哈希集或字段 ID。[0,1] means a hashset of field Ids, starting from 0.

混合拓扑Hybrid topology

本机 Storm 是用 Java 编写的。The native Storm is written in Java. SCP.Net 已对其进行增强,使 C# 开发者能够编写 C# 代码来处理其业务逻辑。And SCP.Net has enhanced it to enable C# developers to write C# code to handle their business logic. 但它也支持混合拓扑,这种拓扑不仅包含 C# Spout/Bolt,还包含 Java Spout/Bolt。But it also supports hybrid topologies, which contains not only C# spouts/bolts, but also Java Spout/Bolts.

在规范文件中指定 Java Spout/BoltSpecify Java Spout/Bolt in spec file

在规范文件中,“scp-spout”和“scp-bolt”也可用于指定 Java Spout 和 Bolt;下面是一个示例:In spec file, "scp-spout" and "scp-bolt" can also be used to specify Java Spouts and Bolts, here is an example:

(spout-spec 
  (microsoft.scp.example.HybridTopology.Generator.)           
  :p 1)

其中, microsoft.scp.example.HybridTopology.Generator 是 Java Spout 类的名称。Here microsoft.scp.example.HybridTopology.Generator is the name of the Java Spout class.

在 runSpec 命令中指定 Java ClasspathSpecify Java Classpath in runSpec Command

如果想要提交包含 Java Spout 或 Bolt 的拓扑,则首先需要编译 Java Spout 或 Bolt 并获取 Jar 文件。If you want to submit topology containing Java Spouts or Bolts, you need to first compile the Java Spouts or Bolts and get the Jar files. 然后,应在提交拓扑时指定包含 Jar 文件的 Java Classpath。Then you should specify the java classpath that contains the Jar files when submitting topology. 以下是示例:Here is an example:

bin\runSpec.cmd examples\HybridTopology\HybridTopology.spec specs examples\HybridTopology\net\Target -cp examples\HybridTopology\java\target\*

其中,examples\HybridTopology\java\target\ 是包含 Java Spout/Bolt Jar 文件的文件夹。Here examples\HybridTopology\java\target\ is the folder containing the Java Spout/Bolt Jar file.

Java 和 C# 之间的序列化和反序列化Serialization and Deserialization between Java and C#

SCP 组件包括 Java 端和 C# 端。SCP component includes Java side and C# side. 若要与本机 Java Spout/Bolt 交互,必须在 Java 端与 C# 端之间执行序列化/反序列化,如下图中所示。In order to interact with native Java Spouts/Bolts, Serialization/Deserialization must be carried out between Java side and C# side, as illustrated in the following graph.

Java 组件示意图,发送到 SCP 组件,发送到 Java 组件

  1. Java 端的序列化和 C# 端的反序列化Serialization in Java side and Deserialization in C# side

    最初,默认情况下是在 Java 端进行序列化并在 C# 端进行反序列化。First we provide default implementation for serialization in Java side and deserialization in C# side. 可以在规范文件中指定 Java 端的序列化方法:The serialization method in Java side can be specified in SPEC file:

    (scp-bolt
        {
            "plugin.name" "HybridTopology.exe"
            "plugin.args" ["displayer"]
            "output.schema" {}
            "customized.java.serializer" ["microsoft.scp.storm.multilang.CustomizedInteropJSONSerializer"]
        })
    

    应在 C# 用户代码中指定 C# 端的反序列化方法:The deserialization method in C# side should be specified in C# user code:

    Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
    inputSchema.Add("default", new List<Type>() { typeof(Person) });
    this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, null));
    this.ctx.DeclareCustomizedDeserializer(new CustomizedInteropJSONDeserializer());            
    

    如果数据类型不是太复杂,这种默认实现方法应该能够应对大多数情况。This default implementation should handle most cases provided the data type is not too complex. 对于某些情况,由于用户数据类型太复杂,或者由于我们的默认实现性能不符合用户要求,用户可能会进行自定义实施。For certain cases, either because the user data type is too complex, or because the performance of our default implementation does not meet the user's requirement, users can plug-in their own implementation.

    Java 端的序列化接口如下定义:The serialize interface in java side is defined as:

    public interface ICustomizedInteropJavaSerializer {
        public void prepare(String[] args);
        public List<ByteBuffer> serialize(List<Object> objectList);
    }
    

    C# 端的反序列化接口如下定义:The deserialize interface in C# side is defined as:

    公共接口 ICustomizedInteropCSharpDeserializerpublic interface ICustomizedInteropCSharpDeserializer

    public interface ICustomizedInteropCSharpDeserializer
    {
        List<Object> Deserialize(List<byte[]> dataList, List<Type> targetTypes);
    }
    
  2. C# 端的序列化和 Java 端的反序列化Serialization in C# side and Deserialization in Java side

    应在 C# 用户代码中指定 C# 端的序列化方法:The serialization method in C# side should be specified in C# user code:

    this.ctx.DeclareCustomizedSerializer(new CustomizedInteropJSONSerializer()); 
    

    应在 SPEC 文件中指定 Java 端的反序列化方法:The Deserialization method in Java side should be specified in SPEC file:

    (scp-spout(scp-spout

    {
      "plugin.name" "HybridTopology.exe"
      "plugin.args" ["generator"]
      "output.schema" {"default" ["person"]}
      "customized.java.deserializer" ["microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" "microsoft.scp.example.HybridTopology.Person"]
    })
    

    其中,"microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" 是反序列化程序的名称,"microsoft.scp.example.HybridTopology.Person" 是数据要反序列化成的目标类。Here "microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" is the name of Deserializer, and "microsoft.scp.example.HybridTopology.Person" is the target class the data is deserialized to.

    用户也可以外挂自己的 C# 序列化程序和 Java 反序列化程序的实现。User can also plug in their own implementation of C# serializer and Java Deserializer. 此代码是 C# 序列化程序的接口:This code is the interface for C# serializer:

    public interface ICustomizedInteropCSharpSerializer
    {
        List<byte[]> Serialize(List<object> dataList);
    }
    

    此代码是 Java 反列化程序的接口:This code is the interface for Java Deserializer:

    public interface ICustomizedInteropJavaDeserializer {
        public void prepare(String[] targetClassNames);
        public List<Object> Deserialize(List<ByteBuffer> dataList);
    }
    

SCP 主机模式SCP Host Mode

在此模式下,用户可以将代码编译为 DLL,以及使用 SCP 提供的 SCPHost.exe 来提交拓扑。In this mode, user can compile their codes to DLL, and use SCPHost.exe provided by SCP to submit topology. 规范文件下方代码所示:The spec file looks like this code:

(scp-spout
  {
    "plugin.name" "SCPHost.exe"
    "plugin.args" ["HelloWorld.dll" "Scp.App.HelloWorld.Generator" "Get"]
    "output.schema" {"default" ["sentence"]}
  })

其中,plugin.name 指定为 SCP SDK 提供的 SCPHost.exeHere, plugin.name is specified as SCPHost.exe provided by SCP SDK. SCPHost.exe 接受三个参数:SCPHost.exe accepts three parameters:

  1. 第一个参数是 DLL 名称,在本示例中为 "HelloWorld.dll"The first one is the DLL name, which is "HelloWorld.dll" in this example.
  2. 第二个参数是类名称,在本示例中为 "Scp.App.HelloWorld.Generator"The second one is the Class name, which is "Scp.App.HelloWorld.Generator" in this example.
  3. 第三个参数是公共静态方法的名称,调用此参数可获得 ISCPPlugin 的实例。The third one is the name of a public static method, which can be invoked to get an instance of ISCPPlugin.

在主机模式下,用户代码被编译为 DLL,并由 SCP 平台调用。In host mode, user code is compiled as DLL, and is invoked by SCP platform. 因此,SCP 平台可以全面控制整个处理逻辑。So SCP platform can get full control of the whole processing logic. 因此,我们建议客户在 SCP 主机模式下提交拓扑,因此这样做可以简化开发过程,获得更大的灵活性,以及对于以后版本的更好向后兼容性。So we recommend our customers to submit topology in SCP host mode since it can simplify the development experience and bring us more flexibility and better backward compatibility for later release as well.

SCP 编程示例SCP Programming Examples

HelloWorldHelloWorld

HelloWorld 是一个简单的 SCP.Net 编程示例。HelloWorld is a simple example to show a taste of SCP.Net. 它使用非事务性拓扑,带有一个名为 generator 的 Spout,以及两个分别名为 splitter 和 counter 的 Bolt。It uses a non-transactional topology, with a spout called generator, and two bolts called splitter and counter. Spout 生成器会随机生成一些句子,然后将生成的句子发送到 拆分器。The spout generator randomly generates sentences, and emit these sentences to splitter. Bolt 拆分器会将句子拆分为字词并将其发送到计数器 Bolt。The bolt splitter splits the sentences to words and emit these words to counter bolt. Bolt "counter" 使用字典记录每个字词出现的次数。The bolt "counter" uses a dictionary to record the occurrence number of each word.

在本示例中,有两个规范文件:HelloWorld.spec 和 HelloWorld_EnableAck.spec。There are two spec files, HelloWorld.spec and HelloWorld_EnableAck.spec for this example. 在 C# 代码中,可以通过从 Java 端获取 pluginConf 来确定是否已启用确认功能。In the C# code, it can find out whether ack is enabled by getting the pluginConf from Java side.

/* demo how to get pluginConf info */
if (Context.Config.pluginConf.ContainsKey(Constants.NONTRANSACTIONAL_ENABLE_ACK))
{
    enableAck = (bool)(Context.Config.pluginConf[Constants.NONTRANSACTIONAL_ENABLE_ACK]);
}
Context.Logger.Info("enableAck: {0}", enableAck);

在 Spout 中,如果启用了确认功能,会使用字典将未确认的元组存储在缓存中。In the spout, if ack is enabled, a dictionary is used to cache the tuples that have not been acknowledged. 如果调用了 Fail(),则会重新处理失败的元组:If Fail() is called, the failed tuple is replayed:

public void Fail(long seqId, Dictionary<string, Object> parms)
{
    Context.Logger.Info("Fail, seqId: {0}", seqId);
    if (cachedTuples.ContainsKey(seqId))
    {
        /* get the cached tuple */
        string sentence = cachedTuples[seqId];

        /* replay the failed tuple */
        Context.Logger.Info("Re-Emit: {0}, seqId: {1}", sentence, seqId);
        this.ctx.Emit(Constants.DEFAULT_STREAM_ID, new Values(sentence), seqId);
    }
    else
    {
        Context.Logger.Warn("Fail(), can't find cached tuple for seqId {0}!", seqId);
    }
}

HelloWorldTxHelloWorldTx

HelloWorldTx 示例展示如何实施事务性拓扑。The HelloWorldTx example demonstrates how to implement transactional topology. 它有一个名为生成器的 Spout、一个名为 partial-count 的批处理 Bolt 以及一个名为 count-sum 的提交 Bolt。It has one spout called generator, a batch bolt called partial-count, and a commit bolt called count-sum. 还有三个预先创建的 txt 文件:DataSource0.txtDataSource1.txtDataSource2.txtThere are also three pre-created txt files: DataSource0.txt, DataSource1.txt, and DataSource2.txt.

在每个事务中,Spout 生成器从预先创建的三个文件中随机选择两个文件,并将那两个文件的名称发送给 partial-count Bolt。In each transaction, the spout generator randomly selects two files from the pre-created three files, and emit the two file names to the partial-count bolt. Bolt partial-count 从接收到的元组获取文件名,然后打开文件并计算文件中的字词数量,最后将计算出的字词数量发送给 count-sum Bolt。The bolt partial-count gets the file name from the received tuple, then open the file and count the number of words in this file, and finally emit the word number to the count-sum bolt. count-sum Bolt 对总计数进行汇总。The count-sum bolt summarizes the total count.

为了获得 exactly once 语义,提交 Bolt count-sum 需要判断事务是否是重复处理的事务。To achieve exactly once semantics, the commit bolt count-sum need to judge whether it is a replayed transaction. 在本示例中,它具有静态成员变量:In this example, it has a static member variable:

public static long lastCommittedTxId = -1; 

创建 ISCPBatchBolt 实例后,它从输入参数获取 txAttemptWhen an ISCPBatchBolt instance is created, it gets the txAttempt from input parameters:

public static CountSum Get(Context ctx, Dictionary<string, Object> parms)
{
    /* for transactional topology, we can get txAttempt from the input parms */
    if (parms.ContainsKey(Constants.STORM_TX_ATTEMPT))
    {
        StormTxAttempt txAttempt = (StormTxAttempt)parms[Constants.STORM_TX_ATTEMPT];
        return new CountSum(ctx, txAttempt);
    }
    else
    {
        throw new Exception("null txAttempt");
    }
}

如果调用 FinishBatch(),在不是重复处理的事务的情况下,会更新 lastCommittedTxIdWhen FinishBatch() is called, the lastCommittedTxId will be updated if it is not a replayed transaction.

public void FinishBatch(Dictionary<string, Object> parms)
{
    /* judge whether it is a replayed transaction? */
    bool replay = (this.txAttempt.TxId <= lastCommittedTxId);

    if (!replay)
    {
        /* If it is not replayed, update the totalCount and lastCommittedTxId value */
        totalCount = totalCount + this.count;
        lastCommittedTxId = this.txAttempt.TxId;
    }
    … …
}

HybridTopologyHybridTopology

此拓扑包含一个 Java Spout 和一个 C# Bolt。This topology contains a Java Spout and a C# Bolt. 它使用 SCP 平台提供的默认序列化和反序列化实现方法。It uses the default serialization and deserialization implementation provided by SCP platform. 有关规范文件的详细信息,请参阅“示例\HybridTopology”文件夹中的 HybridTopology.spec;有关如何指定 Java classpath,请参阅 SubmitTopology.bat。See the HybridTopology.spec in examples\HybridTopology folder for the spec file details, and SubmitTopology.bat for how to specify Java classpath.

SCPHostDemoSCPHostDemo

本质上,本示例与 HelloWorld 相同。This example is the same as HelloWorld in essence. 唯一不同之处是,在本示例中,用户代码被编译为 DLL,而且使用 SCPHost.exe 提交拓扑。The only difference is that the user code is compiled as DLL and the topology is submitted by using SCPHost.exe. 有关更详细说明,请参阅“SCP 主机模式”部分。See the section "SCP Host Mode" for more detailed explanation.

后续步骤Next Steps

有关使用 SCP 创建的 Apache Storm 拓扑示例,请参阅以下文档:For examples of Apache Storm topologies created using SCP, see the following documents: