Package org.apache.storm
Class Testing
- java.lang.Object
-
- org.apache.storm.Testing
-
public class Testing extends Object
A utility that helps with testing topologies, Bolts and Spouts.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
Testing.CapturedTopology<T>
A topology that has all messages captured and can be read later on.static interface
Testing.Condition
Simply produces a boolean to see if a specific state is true or false.
-
Field Summary
Fields Modifier and Type Field Description static int
TEST_TIMEOUT_MS
The default amount of wall time should be spent waiting for specific conditions to happen.
-
Constructor Summary
Constructors Constructor Description Testing()
-
Method Summary
All Methods Static Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static void
advanceClusterTime(ILocalCluster cluster, Integer secs)
Simulated time wait for a cluster.static void
advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step)
Simulated time wait for a cluster.static Testing.CapturedTopology<StormTopology>
captureTopology(StormTopology topology)
Rewrites a topology so that all the tuples flowing through it are captured.static Map<String,List<FixedTuple>>
completeTopology(ILocalCluster cluster, StormTopology topology)
Run a topology to completion capturing all of the messages that are emitted.static Map<String,List<FixedTuple>>
completeTopology(ILocalCluster cluster, StormTopology topology, CompleteTopologyParam param)
Run a topology to completion capturing all of the messages that are emitted.static ILocalCluster
getLocalCluster(Map<String,Object> clusterConf)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ...static int
globalAmt(String id, String key)
Deprecated.static <T> boolean
isEvery(Collection<T> data, Predicate<T> pred)
Convenience method for data.stream.allMatch(pred).static TrackedTopology
mkTrackedTopology(ILocalCluster cluster, StormTopology topology)
Deprecated.useTrackedTopology
directly.static <T> Map<T,Integer>
multiset(Collection<T> c)
Count how many times each element appears in the Collection.static <T> boolean
multiseteq(Collection<T> a, Collection<T> b)
Check if two collections are equivalent ignoring the order of elements.static List<List<Object>>
readTuples(Map<String,List<FixedTuple>> results, String componentId)
Get all of the tuples from a given component on the default stream.static List<List<Object>>
readTuples(Map<String,List<FixedTuple>> results, String componentId, String streamId)
Get all of the tuples from a given component on a given stream.static void
simulateWait(ILocalCluster cluster)
If using simulated time simulate waiting for 10 seconds.static Tuple
testTuple(List<Object> values)
Create aTuple
for use with testing.static Tuple
testTuple(List<Object> values, MkTupleParam param)
Create aTuple
for use with testing.static Testing.CapturedTopology<TrackedTopology>
trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology)
Track and capture a topology.static void
trackedWait(Testing.CapturedTopology<TrackedTopology> topo)
Simulated time wait for a tracked topology.static void
trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt)
Simulated time wait for a tracked topology.static void
trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs)
Simulated time wait for a tracked topology.static void
trackedWait(TrackedTopology topo)
Simulated time wait for a tracked topology.static void
trackedWait(TrackedTopology topo, Integer amt)
Simulated time wait for a tracked topology.static void
trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs)
Simulated time wait for a tracked topology.static void
whileTimeout(long timeoutMs, Testing.Condition condition, Runnable body)
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.static void
whileTimeout(Testing.Condition condition, Runnable body)
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.static void
withLocalCluster(MkClusterParam param, TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ...static void
withLocalCluster(TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster()) { ...static void
withSimulatedTime(Runnable code)
Deprecated.use ``` try (Time.SimulatedTime time = new Time.SimulatedTime()) { ...static void
withSimulatedTimeLocalCluster(MkClusterParam param, TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) { ...static void
withSimulatedTimeLocalCluster(TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) { ...static void
withTrackedCluster(MkClusterParam param, TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) { ...static void
withTrackedCluster(TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) { ...
-
-
-
Method Detail
-
whileTimeout
public static void whileTimeout(Testing.Condition condition, Runnable body)
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.- Parameters:
condition
- what we are waiting forbody
- what to run in the loop- Throws:
AssertionError
- if the loop timed out.
-
whileTimeout
public static void whileTimeout(long timeoutMs, Testing.Condition condition, Runnable body)
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.- Parameters:
timeoutMs
- the number of ms to wait before timing out.condition
- what we are waiting forbody
- what to run in the loop- Throws:
AssertionError
- if the loop timed out.
-
isEvery
public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred)
Convenience method for data.stream.allMatch(pred).
-
withSimulatedTime
@Deprecated public static void withSimulatedTime(Runnable code)
Deprecated.use ``` try (Time.SimulatedTime time = new Time.SimulatedTime()) { ... } ```Run with simulated time.- Parameters:
code
- what to run
-
withLocalCluster
@Deprecated public static void withLocalCluster(TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster()) { ... } ```Run with a local cluster.- Parameters:
code
- what to run
-
withLocalCluster
@Deprecated public static void withLocalCluster(MkClusterParam param, TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```Run with a local cluster.- Parameters:
param
- configs to set in the clustercode
- what to run
-
getLocalCluster
@Deprecated public static ILocalCluster getLocalCluster(Map<String,Object> clusterConf)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```Run with a local cluster.- Parameters:
clusterConf
- some configs to set in the cluster
-
withSimulatedTimeLocalCluster
@Deprecated public static void withSimulatedTimeLocalCluster(TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) { ... } ```Run with a local cluster.- Parameters:
code
- what to run
-
withSimulatedTimeLocalCluster
@Deprecated public static void withSimulatedTimeLocalCluster(MkClusterParam param, TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) { ... } ```Run with a local cluster.- Parameters:
param
- configs to set in the clustercode
- what to run
-
withTrackedCluster
@Deprecated public static void withTrackedCluster(TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) { ... } ```Run with a local cluster.- Parameters:
code
- what to run
-
withTrackedCluster
@Deprecated public static void withTrackedCluster(MkClusterParam param, TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) { ... } ```Run with a local tracked cluster.- Parameters:
param
- configs to set in the clustercode
- what to run
-
globalAmt
@Deprecated public static int globalAmt(String id, String key)
Deprecated.In a tracked topology some metrics are tracked. This provides a way to get those metrics. This is intended mostly for internal testing.- Parameters:
id
- the id of the tracked clusterkey
- the name of the metric to get.- Returns:
- the metric
-
trackAndCaptureTopology
public static Testing.CapturedTopology<TrackedTopology> trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology)
Track and capture a topology. This is intended mostly for internal testing.
-
captureTopology
public static Testing.CapturedTopology<StormTopology> captureTopology(StormTopology topology)
Rewrites a topology so that all the tuples flowing through it are captured.- Parameters:
topology
- the topology to rewrite- Returns:
- the modified topology and a new Bolt that can retrieve the captured tuples.
-
completeTopology
public static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException, org.apache.storm.thrift.TException
Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances ofCompletableSpout
.- Parameters:
cluster
- the cluster to submit the topology totopology
- the topology itself- Returns:
- a map of the component to the list of tuples it emitted
- Throws:
org.apache.storm.thrift.TException
- on any error from nimbusInterruptedException
-
completeTopology
public static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology, CompleteTopologyParam param) throws org.apache.storm.thrift.TException, InterruptedException
Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances ofCompletableSpout
or are overwritten by MockedSources in param- Parameters:
cluster
- the cluster to submit the topology totopology
- the topology itselfparam
- parameters to describe how to complete a topology- Returns:
- a map of the component to the list of tuples it emitted
- Throws:
org.apache.storm.thrift.TException
- on any error from nimbus.InterruptedException
-
simulateWait
public static void simulateWait(ILocalCluster cluster) throws InterruptedException
If using simulated time simulate waiting for 10 seconds. This is intended for internal testing only.- Throws:
InterruptedException
-
readTuples
public static List<List<Object>> readTuples(Map<String,List<FixedTuple>> results, String componentId)
Get all of the tuples from a given component on the default stream.- Parameters:
results
- the results of running a completed topologycomponentId
- the id of the component to look at- Returns:
- a list of the tuple values.
-
readTuples
public static List<List<Object>> readTuples(Map<String,List<FixedTuple>> results, String componentId, String streamId)
Get all of the tuples from a given component on a given stream.- Parameters:
results
- the results of running a completed topologycomponentId
- the id of the component to look atstreamId
- the id of the stream to look for.- Returns:
- a list of the tuple values.
-
mkTrackedTopology
@Deprecated public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology)
Deprecated.useTrackedTopology
directly.Create a tracked topology.
-
trackedWait
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(TrackedTopology topo)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(TrackedTopology topo, Integer amt)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
advanceClusterTime
public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException
Simulated time wait for a cluster. This is intended for internal testing.- Throws:
InterruptedException
-
advanceClusterTime
public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException
Simulated time wait for a cluster. This is intended for internal testing.- Throws:
InterruptedException
-
multiset
public static <T> Map<T,Integer> multiset(Collection<T> c)
Count how many times each element appears in the Collection.- Parameters:
c
- a collection of values- Returns:
- a map of the unique values in c to the count of those values.
-
multiseteq
public static <T> boolean multiseteq(Collection<T> a, Collection<T> b)
Check if two collections are equivalent ignoring the order of elements.
-
testTuple
public static Tuple testTuple(List<Object> values)
Create aTuple
for use with testing.- Parameters:
values
- the values to appear in the tuple
-
testTuple
public static Tuple testTuple(List<Object> values, MkTupleParam param)
Create aTuple
for use with testing.- Parameters:
values
- the values to appear in the tupleparam
- parametrs describing more details about the tuple
-
-