public class TopologyBuilder extends Object
TopologyBuilder exposes the Java API for specifying a topology for Storm to execute. Topologies are Thrift structures in the end, but since the Thrift API is so verbose, TopologyBuilder greatly eases the process of creating topologies. The template for creating and submitting a topology looks something like:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("1", new TestWordSpout(true), 5);
builder.setSpout("2", new TestWordSpout(true), 3);
builder.setBolt("3", new TestWordCounter(), 3)
.fieldsGrouping("1", new Fields("word"))
.fieldsGrouping("2", new Fields("word"));
builder.setBolt("4", new TestGlobalCount())
.globalGrouping("1");
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 4);
StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());
Running the exact same topology in local mode (in process), and configuring it to log all tuples emitted, looks like the following. Note that it lets the topology run for 10 seconds before shutting down the local cluster.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("1", new TestWordSpout(true), 5);
builder.setSpout("2", new TestWordSpout(true), 3);
builder.setBolt("3", new TestWordCounter(), 3)
.fieldsGrouping("1", new Fields("word"))
.fieldsGrouping("2", new Fields("word"));
builder.setBolt("4", new TestGlobalCount())
.globalGrouping("1");
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 4);
conf.put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Utils.sleep(10000);
cluster.shutdown();
The pattern for TopologyBuilder
is to map component ids to components using the setSpout and setBolt methods. Those methods return objects that are then used to declare the inputs for that component.
Modifier and Type | Class and Description |
---|---|
protected class |
TopologyBuilder.BoltGetter |
protected class |
TopologyBuilder.ConfigGetter<T extends ComponentConfigurationDeclarer> |
protected class |
TopologyBuilder.SpoutGetter |
Constructor and Description |
---|
TopologyBuilder() |
Modifier and Type | Method and Description |
---|---|
void |
addWorkerHook(IWorkerHook workerHook)
Add a new worker lifecycle hook
|
StormTopology |
createTopology() |
BoltDeclarer |
setBolt(String id,
IBasicBolt bolt)
Define a new bolt in this topology.
|
BoltDeclarer |
setBolt(String id,
IBasicBolt bolt,
Number parallelism_hint)
Define a new bolt in this topology.
|
BoltDeclarer |
setBolt(String id,
IRichBolt bolt)
Define a new bolt in this topology with parallelism of just one thread.
|
BoltDeclarer |
setBolt(String id,
IRichBolt bolt,
Number parallelism_hint)
Define a new bolt in this topology with the specified amount of parallelism.
|
<T extends State> |
setBolt(String id,
IStatefulBolt<T> bolt)
Define a new bolt in this topology.
|
<T extends State> |
setBolt(String id,
IStatefulBolt<T> bolt,
Number parallelism_hint)
Define a new bolt in this topology.
|
<T extends State> |
setBolt(String id,
IStatefulWindowedBolt<T> bolt)
Define a new bolt in this topology.
|
<T extends State> |
setBolt(String id,
IStatefulWindowedBolt<T> bolt,
Number parallelism_hint)
Define a new bolt in this topology.
|
BoltDeclarer |
setBolt(String id,
IWindowedBolt bolt)
Define a new bolt in this topology.
|
BoltDeclarer |
setBolt(String id,
IWindowedBolt bolt,
Number parallelism_hint)
Define a new bolt in this topology.
|
SpoutDeclarer |
setSpout(String id,
IRichSpout spout)
Define a new spout in this topology.
|
SpoutDeclarer |
setSpout(String id,
IRichSpout spout,
Number parallelism_hint)
Define a new spout in this topology with the specified parallelism.
|
void |
setStateSpout(String id,
IRichStateSpout stateSpout) |
void |
setStateSpout(String id,
IRichStateSpout stateSpout,
Number parallelism_hint) |
public StormTopology createTopology()
public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentException
Define a new bolt in this topology with parallelism of just one thread.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the boltIllegalArgumentException
- if parallelism_hint
is not positivepublic BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException
Define a new bolt in this topology with the specified amount of parallelism.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the boltparallelism_hint
- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.IllegalArgumentException
- if parallelism_hint
is not positivepublic BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentException
Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the basic boltIllegalArgumentException
- if parallelism_hint
is not positivepublic BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) throws IllegalArgumentException
Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the basic boltparallelism_hint
- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.IllegalArgumentException
- if parallelism_hint
is not positivepublic BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException
Define a new bolt in this topology. This defines a windowed bolt, intended for windowing operations. The IWindowedBolt.execute(TupleWindow)
method is triggered for each window interval with the list of current events in the window.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the windowed boltIllegalArgumentException
- if parallelism_hint
is not positivepublic BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hint) throws IllegalArgumentException
Define a new bolt in this topology. This defines a windowed bolt, intended for windowing operations. The IWindowedBolt.execute(TupleWindow)
method is triggered for each window interval with the list of current events in the window.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the windowed boltparallelism_hint
- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.IllegalArgumentException
- if parallelism_hint
is not positivepublic <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt) throws IllegalArgumentException
Define a new bolt in this topology. This defines a stateful bolt, that requires its state (of computation) to be saved. When this bolt is initialized, the IStatefulComponent.initState(State)
method is invoked after IStatefulBolt.prepare(Map, TopologyContext, OutputCollector)
but before IStatefulBolt.execute(Tuple)
with its previously saved state.
The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the stateful boltIllegalArgumentException
- if parallelism_hint
is not positivepublic <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt, Number parallelism_hint) throws IllegalArgumentException
Define a new bolt in this topology. This defines a stateful bolt, that requires its state (of computation) to be saved. When this bolt is initialized, the IStatefulComponent.initState(State)
method is invoked after IStatefulBolt.prepare(Map, TopologyContext, OutputCollector)
but before IStatefulBolt.execute(Tuple)
with its previously saved state.
The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the stateful boltparallelism_hint
- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.IllegalArgumentException
- if parallelism_hint
is not positivepublic <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException
Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful windowing operations. The IWindowedBolt.execute(TupleWindow)
method is triggered for each window interval with the list of current events in the window. During initialization of this bolt IStatefulComponent.initState(State)
is invoked with its previously saved state.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the stateful windowed boltT
- the type of the state (e.g. KeyValueState
)IllegalArgumentException
- if parallelism_hint
is not positivepublic <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt, Number parallelism_hint) throws IllegalArgumentException
Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful windowing operations. The IWindowedBolt.execute(TupleWindow)
method is triggered for each window interval with the list of current events in the window. During initialization of this bolt IStatefulComponent.initState(State)
is invoked with its previously saved state.
id
- the id of this component. This id is referenced by other components that want to consume this bolt’s outputs.bolt
- the stateful windowed boltparallelism_hint
- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.T
- the type of the state (e.g. KeyValueState
)IllegalArgumentException
- if parallelism_hint
is not positivepublic SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumentException
Define a new spout in this topology.
id
- the id of this component. This id is referenced by other components that want to consume this spout’s outputs.spout
- the spoutIllegalArgumentException
- if parallelism_hint
is not positivepublic SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException
Define a new spout in this topology with the specified parallelism. If the spout declares itself as non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
id
- the id of this component. This id is referenced by other components that want to consume this spout’s outputs.parallelism_hint
- the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster.spout
- the spoutIllegalArgumentException
- if parallelism_hint
is not positivepublic void setStateSpout(String id, IRichStateSpout stateSpout) throws IllegalArgumentException
IllegalArgumentException
public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) throws IllegalArgumentException
IllegalArgumentException
public void addWorkerHook(IWorkerHook workerHook)
Add a new worker lifecycle hook
workerHook
- the lifecycle hook to addCopyright © 2019 The Apache Software Foundation. All Rights Reserved.