backtype.storm.topology
Class TopologyBuilder

java.lang.Object
  extended by backtype.storm.topology.TopologyBuilder

public class TopologyBuilder
extends java.lang.Object

TopologyBuilder exposes the Java API for specifying a topology for Storm to execute. Topologies are Thrift structures in the end, but since the Thrift API is so verbose, TopologyBuilder greatly eases the process of creating topologies. The template for creating and submitting a topology looks something like:

 TopologyBuilder builder = new TopologyBuilder();

 builder.setSpout("1", new TestWordSpout(true), 5);
 builder.setSpout("2", new TestWordSpout(true), 3);
 builder.setBolt("3", new TestWordCounter(), 3)
          .fieldsGrouping("1", new Fields("word"))
          .fieldsGrouping("2", new Fields("word"));
 builder.setBolt("4", new TestGlobalCount())
          .globalGrouping("1");

 Map conf = new HashMap();
 conf.put(Config.TOPOLOGY_WORKERS, 4);
 
 StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());
 
Running the exact same topology in local mode (in process), and configuring it to log all tuples emitted, looks like the following. Note that it lets the topology run for 10 seconds before shutting down the local cluster.
 TopologyBuilder builder = new TopologyBuilder();

 builder.setSpout("1", new TestWordSpout(true), 5);
 builder.setSpout("2", new TestWordSpout(true), 3);
 builder.setBolt("3", new TestWordCounter(), 3)
          .fieldsGrouping("1", new Fields("word"))
          .fieldsGrouping("2", new Fields("word"));
 builder.setBolt("4", new TestGlobalCount())
          .globalGrouping("1");

 Map conf = new HashMap();
 conf.put(Config.TOPOLOGY_WORKERS, 4);
 conf.put(Config.TOPOLOGY_DEBUG, true);

 LocalCluster cluster = new LocalCluster();
 cluster.submitTopology("mytopology", conf, builder.createTopology());
 Utils.sleep(10000);
 cluster.shutdown();
 

The pattern for TopologyBuilder is to map component ids to components using the setSpout and setBolt methods. Those methods return objects that are then used to declare the inputs for that component.


Nested Class Summary
protected  class TopologyBuilder.BoltGetter
           
protected  class TopologyBuilder.ConfigGetter<T extends ComponentConfigurationDeclarer>
           
protected  class TopologyBuilder.SpoutGetter
           
 
Constructor Summary
TopologyBuilder()
           
 
Method Summary
 StormTopology createTopology()
           
 BoltDeclarer setBolt(java.lang.String id, IBasicBolt bolt)
          Define a new bolt in this topology.
 BoltDeclarer setBolt(java.lang.String id, IBasicBolt bolt, java.lang.Integer parallelism_hint)
          Define a new bolt in this topology.
 BoltDeclarer setBolt(java.lang.String id, IRichBolt bolt)
          Define a new bolt in this topology with parallelism of just one thread.
 BoltDeclarer setBolt(java.lang.String id, IRichBolt bolt, java.lang.Integer parallelism_hint)
          Define a new bolt in this topology with the specified amount of parallelism.
 SpoutDeclarer setSpout(java.lang.String id, IRichSpout spout)
          Define a new spout in this topology.
 SpoutDeclarer setSpout(java.lang.String id, IRichSpout spout, java.lang.Integer parallelism_hint)
          Define a new spout in this topology with the specified parallelism.
 void setStateSpout(java.lang.String id, IRichStateSpout stateSpout)
           
 void setStateSpout(java.lang.String id, IRichStateSpout stateSpout, java.lang.Integer parallelism_hint)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

TopologyBuilder

public TopologyBuilder()
Method Detail

createTopology

public StormTopology createTopology()

setBolt

public BoltDeclarer setBolt(java.lang.String id,
                            IRichBolt bolt)
Define a new bolt in this topology with parallelism of just one thread.

Parameters:
id - the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
bolt - the bolt
Returns:
use the returned object to declare the inputs to this component

setBolt

public BoltDeclarer setBolt(java.lang.String id,
                            IRichBolt bolt,
                            java.lang.Integer parallelism_hint)
Define a new bolt in this topology with the specified amount of parallelism.

Parameters:
id - the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
bolt - the bolt
parallelism_hint - the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
Returns:
use the returned object to declare the inputs to this component

setBolt

public BoltDeclarer setBolt(java.lang.String id,
                            IBasicBolt bolt)
Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.

Parameters:
id - the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
bolt - the basic bolt
Returns:
use the returned object to declare the inputs to this component

setBolt

public BoltDeclarer setBolt(java.lang.String id,
                            IBasicBolt bolt,
                            java.lang.Integer parallelism_hint)
Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.

Parameters:
id - the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
bolt - the basic bolt
parallelism_hint - the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
Returns:
use the returned object to declare the inputs to this component

setSpout

public SpoutDeclarer setSpout(java.lang.String id,
                              IRichSpout spout)
Define a new spout in this topology.

Parameters:
id - the id of this component. This id is referenced by other components that want to consume this spout's outputs.
spout - the spout

setSpout

public SpoutDeclarer setSpout(java.lang.String id,
                              IRichSpout spout,
                              java.lang.Integer parallelism_hint)
Define a new spout in this topology with the specified parallelism. If the spout declares itself as non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.

Parameters:
id - the id of this component. This id is referenced by other components that want to consume this spout's outputs.
parallelism_hint - the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster.
spout - the spout

setStateSpout

public void setStateSpout(java.lang.String id,
                          IRichStateSpout stateSpout)

setStateSpout

public void setStateSpout(java.lang.String id,
                          IRichStateSpout stateSpout,
                          java.lang.Integer parallelism_hint)