public class Testing extends Object
A utility that helps with testing topologies, Bolts and Spouts.
Modifier and Type | Class and Description |
---|---|
static class |
Testing.CapturedTopology<T>
A topology that has all messages captured and can be read later on.
|
static interface |
Testing.Condition
Simply produces a boolean to see if a specific state is true or false.
|
Modifier and Type | Field and Description |
---|---|
static int |
TEST_TIMEOUT_MS
The default amount of wall time should be spent waiting for specific conditions to happen.
|
Constructor and Description |
---|
Testing() |
Modifier and Type | Method and Description |
---|---|
static void |
advanceClusterTime(ILocalCluster cluster,
Integer secs)
Simulated time wait for a cluster.
|
static void |
advanceClusterTime(ILocalCluster cluster,
Integer secs,
Integer step)
Simulated time wait for a cluster.
|
static Testing.CapturedTopology<StormTopology> |
captureTopology(StormTopology topology)
Rewrites a topology so that all the tuples flowing through it are captured.
|
static Map<String,List<FixedTuple>> |
completeTopology(ILocalCluster cluster,
StormTopology topology)
Run a topology to completion capturing all of the messages that are emitted.
|
static Map<String,List<FixedTuple>> |
completeTopology(ILocalCluster cluster,
StormTopology topology,
CompleteTopologyParam param)
Run a topology to completion capturing all of the messages that are emitted.
|
static ILocalCluster |
getLocalCluster(Map<String,Object> clusterConf)
Deprecated.
use
try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
...
}
|
static int |
globalAmt(String id,
String key)
Deprecated.
|
static <T> boolean |
isEvery(Collection<T> data,
Predicate<T> pred)
Convenience method for data.stream.allMatch(pred).
|
static TrackedTopology |
mkTrackedTopology(ILocalCluster cluster,
StormTopology topology)
Deprecated.
use
TrackedTopology directly. |
static <T> Map<T,Integer> |
multiset(Collection<T> c)
Count how many times each element appears in the Collection.
|
static <T> boolean |
multiseteq(Collection<T> a,
Collection<T> b)
Check if two collections are equivalent ignoring the order of elements.
|
static List<List<Object>> |
readTuples(Map<String,List<FixedTuple>> results,
String componentId)
Get all of the tuples from a given component on the default stream.
|
static List<List<Object>> |
readTuples(Map<String,List<FixedTuple>> results,
String componentId,
String streamId)
Get all of the tuples from a given component on a given stream.
|
static void |
simulateWait(ILocalCluster cluster)
If using simulated time simulate waiting for 10 seconds.
|
static Tuple |
testTuple(List<Object> values)
Create a
Tuple for use with testing. |
static Tuple |
testTuple(List<Object> values,
MkTupleParam param)
Create a
Tuple for use with testing. |
static Testing.CapturedTopology<TrackedTopology> |
trackAndCaptureTopology(ILocalCluster cluster,
StormTopology topology)
Track and capture a topology.
|
static void |
trackedWait(Testing.CapturedTopology<TrackedTopology> topo)
Simulated time wait for a tracked topology.
|
static void |
trackedWait(Testing.CapturedTopology<TrackedTopology> topo,
Integer amt)
Simulated time wait for a tracked topology.
|
static void |
trackedWait(Testing.CapturedTopology<TrackedTopology> topo,
Integer amt,
Integer timeoutMs)
Simulated time wait for a tracked topology.
|
static void |
trackedWait(TrackedTopology topo)
Simulated time wait for a tracked topology.
|
static void |
trackedWait(TrackedTopology topo,
Integer amt)
Simulated time wait for a tracked topology.
|
static void |
trackedWait(TrackedTopology topo,
Integer amt,
Integer timeoutMs)
Simulated time wait for a tracked topology.
|
static void |
whileTimeout(long timeoutMs,
Testing.Condition condition,
Runnable body)
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.
|
static void |
whileTimeout(Testing.Condition condition,
Runnable body)
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.
|
static void |
withLocalCluster(MkClusterParam param,
TestJob code)
Deprecated.
use
try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
...
}
|
static void |
withLocalCluster(TestJob code)
Deprecated.
use
try (LocalCluster cluster = new LocalCluster()) {
...
}
|
static void |
withSimulatedTime(Runnable code)
Deprecated.
use
try (Time.SimulatedTime time = new Time.SimulatedTime()) {
...
}
|
static void |
withSimulatedTimeLocalCluster(MkClusterParam param,
TestJob code)
Deprecated.
use
try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) {
...
}
|
static void |
withSimulatedTimeLocalCluster(TestJob code)
Deprecated.
use
try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) {
...
}
|
static void |
withTrackedCluster(MkClusterParam param,
TestJob code)
Deprecated.
use
try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) {
...
}
|
static void |
withTrackedCluster(TestJob code)
Deprecated.
use
try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) {
...
}
|
public static final int TEST_TIMEOUT_MS
The default amount of wall time should be spent waiting for specific conditions to happen. Default is 10 seconds unless the environment variable STORM_TEST_TIMEOUT_MS is set.
public static void whileTimeout(Testing.Condition condition, Runnable body)
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.
condition
- what we are waiting forbody
- what to run in the loopAssertionError
- if the loop timed out.public static void whileTimeout(long timeoutMs, Testing.Condition condition, Runnable body)
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.
timeoutMs
- the number of ms to wait before timing out.condition
- what we are waiting forbody
- what to run in the loopAssertionError
- if the loop timed out.public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred)
Convenience method for data.stream.allMatch(pred).
@Deprecated public static void withSimulatedTime(Runnable code)
try (Time.SimulatedTime time = new Time.SimulatedTime()) {
...
}
Run with simulated time.
code
- what to run@Deprecated public static void withLocalCluster(TestJob code)
try (LocalCluster cluster = new LocalCluster()) {
...
}
Run with a local cluster.
code
- what to run@Deprecated public static void withLocalCluster(MkClusterParam param, TestJob code)
try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
...
}
Run with a local cluster.
param
- configs to set in the clustercode
- what to run@Deprecated public static ILocalCluster getLocalCluster(Map<String,Object> clusterConf)
try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
...
}
Run with a local cluster.
clusterConf
- some configs to set in the cluster@Deprecated public static void withSimulatedTimeLocalCluster(TestJob code)
try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) {
...
}
Run with a local cluster.
code
- what to run@Deprecated public static void withSimulatedTimeLocalCluster(MkClusterParam param, TestJob code)
try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) {
...
}
Run with a local cluster.
param
- configs to set in the clustercode
- what to run@Deprecated public static void withTrackedCluster(TestJob code)
try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) {
...
}
Run with a local cluster.
code
- what to run@Deprecated public static void withTrackedCluster(MkClusterParam param, TestJob code)
try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) {
...
}
Run with a local tracked cluster.
param
- configs to set in the clustercode
- what to run@Deprecated public static int globalAmt(String id, String key)
In a tracked topology some metrics are tracked. This provides a way to get those metrics. This is intended mostly for internal testing.
id
- the id of the tracked clusterkey
- the name of the metric to get.public static Testing.CapturedTopology<TrackedTopology> trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology)
Track and capture a topology. This is intended mostly for internal testing.
public static Testing.CapturedTopology<StormTopology> captureTopology(StormTopology topology)
Rewrites a topology so that all the tuples flowing through it are captured.
topology
- the topology to rewritepublic static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException, org.apache.storm.thrift.TException
Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances of CompletableSpout
.
cluster
- the cluster to submit the topology totopology
- the topology itselforg.apache.storm.thrift.TException
- on any error from nimbusInterruptedException
public static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology, CompleteTopologyParam param) throws org.apache.storm.thrift.TException, InterruptedException
Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances of CompletableSpout
or are overwritten by MockedSources in param
cluster
- the cluster to submit the topology totopology
- the topology itselfparam
- parameters to describe how to complete a topologyorg.apache.storm.thrift.TException
- on any error from nimbus.InterruptedException
public static void simulateWait(ILocalCluster cluster) throws InterruptedException
If using simulated time simulate waiting for 10 seconds. This is intended for internal testing only.
InterruptedException
public static List<List<Object>> readTuples(Map<String,List<FixedTuple>> results, String componentId)
Get all of the tuples from a given component on the default stream.
results
- the results of running a completed topologycomponentId
- the id of the component to look atpublic static List<List<Object>> readTuples(Map<String,List<FixedTuple>> results, String componentId, String streamId)
Get all of the tuples from a given component on a given stream.
results
- the results of running a completed topologycomponentId
- the id of the component to look atstreamId
- the id of the stream to look for.@Deprecated public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology)
TrackedTopology
directly.Create a tracked topology.
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo)
Simulated time wait for a tracked topology. This is intended for internal testing.
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt)
Simulated time wait for a tracked topology. This is intended for internal testing.
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs)
Simulated time wait for a tracked topology. This is intended for internal testing.
public static void trackedWait(TrackedTopology topo)
Simulated time wait for a tracked topology. This is intended for internal testing.
public static void trackedWait(TrackedTopology topo, Integer amt)
Simulated time wait for a tracked topology. This is intended for internal testing.
public static void trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs)
Simulated time wait for a tracked topology. This is intended for internal testing.
public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException
Simulated time wait for a cluster. This is intended for internal testing.
InterruptedException
public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException
Simulated time wait for a cluster. This is intended for internal testing.
InterruptedException
public static <T> Map<T,Integer> multiset(Collection<T> c)
Count how many times each element appears in the Collection.
c
- a collection of valuespublic static <T> boolean multiseteq(Collection<T> a, Collection<T> b)
Check if two collections are equivalent ignoring the order of elements.
public static Tuple testTuple(List<Object> values)
Create a Tuple
for use with testing.
values
- the values to appear in the tuplepublic static Tuple testTuple(List<Object> values, MkTupleParam param)
Create a Tuple
for use with testing.
values
- the values to appear in the tupleparam
- parametrs describing more details about the tupleCopyright © 2022 The Apache Software Foundation. All rights reserved.