Historically Storm provided Spout and Bolt apis for expressing streaming computations. Though these apis are fairly simple to use, there are no reusable constructs for expressing common streaming operations like filtering, transformations, windowing, joins, aggregations and so on.

Stream APIs build on top of the Storm’s spouts and bolts to provide a typed API for expressing streaming computations and supports functional style operations such as map-reduce.

Concepts

Conceptually a Stream can be thought of as a stream of messages flowing through a pipeline. A Stream may be generated by reading messages out of a source like spout, or by transforming other streams. For example,

// imports
import org.apache.storm.streams.Stream;
import org.apache.storm.streams.StreamBuilder;
...

StreamBuilder builder = new StreamBuilder();

// a stream of sentences obtained from a source spout
Stream<String> sentences = builder.newStream(new RandomSentenceSpout()).map(tuple -> tuple.getString(0));

// a stream of words obtained by transforming (splitting) the stream of sentences
Stream<String> words = sentences.flatMap(s -> Arrays.asList(s.split(" ")));

// output operation that prints the words to console
words.forEach(w -> System.out.println(w));

Most stream operations accept parameters that describe user-specified behavior typically via lambda expressions like s -> Arrays.asList(s.split(" ")) as in the above example.

A Stream supports two kinds of operations,

  1. Transformations that produce another stream from the current stream (like the flatMap operation in the example above)
  2. Output operations that produce a result. (like the forEach operation in the example above).

Stream Builder

StreamBuilder provides the builder apis to create a new stream. Typically a spout forms the source of a stream.

StreamBuilder builder = new StreamBuilder();
Stream<Tuple> sentences = builder.newStream(new TestSentenceSpout());

The StreamBuilder tracks the overall pipeline of operations expressed via the Stream. One can then create the Storm topology via build() and submit it like a normal storm topology via StormSubmitter.

StormSubmitter.submitTopologyWithProgressBar("test", new Config(), streamBuilder.build());

Value mapper

Value mappers can be used to extract specific fields from the tuples emitted from a spout to produce a typed stream of values. Value mappers are passed as arguments to the StreamBuilder.newStream.

StreamBuilder builder = new StreamBuilder();

// extract the first field from the tuple to get a Stream<String> of sentences
Stream<String> sentences = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0));

Storm provides strongly typed tuples via the Pair and Tuple classes (Tuple3 upto Tuple10). One can use a TupleValueMapper to produce a stream of typed tuples as shown below.

// extract first three fields of the tuple emitted by the spout to produce a stream of typed tuples.
Stream<Tuple3<String, Integer, Long>> stream = builder.newStream(new TestSpout(), TupleValueMappers.of(0, 1, 2));

Stream APIs

Storm’s streaming apis (defined in Stream and PairStream) currently support a wide range of operations such as transformations, filters, windowing, aggregations, branching, joins, stateful, output and debugging operations.

Basic transformations

filter

filter returns a stream consisting of the elements of the stream that matches the given Predicate (for which the predicate returns true).

Stream<String> logs = ...
Stream<String> errors = logs.filter(line -> line.contains("ERROR"));

In the above example log lines with ‘ERROR’ are filtered into an error stream which can be then be further processed.

map

map returns a stream consisting of the result of applying the given mapping function to the values of the stream.

Stream<String> words = ...
Stream<Integer> wordLengths = words.map(String::length);

The example generates a stream of word lengths from a stream of words by applying the String.length function on each value. Note that the type of the resultant stream of a map operation can be different from that of the original stream.

flatMap

flatMap returns a stream consisting of the results of replacing each value of the stream with the contents produced by applying the provided mapping function to each value. This is similar to map but each value can be mapped to 0 or more values.

Stream<String> sentences = ...
Stream<String> words = sentences.flatMap(s -> Arrays.asList(s.split(" ")));

In the above example, the lambda function splits each value in the stream to a list of words and the flatMap function generates a flattened stream of words out of it.

Windowing

A window operation produces a windowed stream consisting of the elements that fall within the window as specified by the window parameter. All the windowing options supported in the underlying windowed bolts are supported via the Stream apis.

Stream<T> windowedStream = stream.window(Window<?, ?> windowConfig);

The windowConfig parameter specifies the windowing config like sliding or tumbling windows based on time duration or event count.

// time based sliding window
stream.window(SlidingWindows.of(Duration.minutes(10), Duration.minutes(1)));

// count based sliding window
stream.window(SlidingWindows.of(Count.(10), Count.of(2)));

// tumbling window
stream.window(TumblingWindows.of(Duration.seconds(10));

// specifying timestamp field for event time based processing and a late tuple stream.
stream.window(TumblingWindows.of(Duration.seconds(10)
                     .withTimestampField("ts")
                     .withLateTupleStream("late_events"));

A windowing operation splits the continuous stream of values into subsets and is necessary for performing operations like Joins and Aggregations.

Transformation to key-value pairs

mapToPair and flatMapToPair

These operations transform a Stream of values into a stream of key-value pairs.

Stream<Integer> integers =  // 1, 2, 3, 4, ... 
PairStream<Integer, Integer> squares = integers.mapToPair(x -> Pair.of(x, x*x)); // (1, 1), (2, 4), (3, 9), (4, 16), ...

A key-value pair stream is required for operations like groupByKey, aggregateByKey, joins etc.

Aggregations

Aggregate operations aggregate the values (or key-values) in a stream. Typically the aggregation operations are performed on a windowed stream where the aggregate results are emitted on each window activation.

aggregate and reduce

aggregate and reduce computes global aggregation i.e. the values across all partitions are forwarded to a single task for computing the aggregate.

Stream<Long> numbers = 
// aggregate the numbers and produce a stream of last 10 sec sums.
Stream<Long> sums = numbers.window(TumblingWindows.of(Duration.seconds(10)).aggregate(new Sum());

// the last 10 sec sums computed using reduce
Stream<Long> sums = numbers.window(...).reduce((x, y) -> x + y);

aggregate and reduce differs in the way in which the aggregate results are computed.

A reduce operation repeatedly applies the given reducer and reduces two values to a single value until there is only one value left. This may not be feasible or easy for all kinds of aggreagations (e.g. avg).

An aggregate operation does a mutable reduction. A mutable reduction accumulates results into an accumulator as it processes the values.

The aggregation operations (aggregate and reduce) automatically does a local aggregation whenever possible before doing the network shuffle to minimize the amount of messages transmitted over the network. For example to compute sum, a per-partition partial sum is computed and only the partial sums are transferred over the network to the target bolt where the partial sums are merged to produce the final sum. A CombinerAggregator interface is used as the argument of aggregate to enable this.

For example the Sum (passed as the argument of aggregate in the example above) can be implemented as a CombinerAggregator as follows.

public class Sum implements CombinerAggregator<Long, Long, Long> {

    // The initial value of the sum
    @Override
    public Long init() {
        return 0L;
    }

    // Updates the sum by adding the value (this could be a partial sum)
    @Override
    public Long apply(Long aggregate, Long value) {
        return aggregate + value;
    }

    // merges the partial sums
    @Override
    public Long merge(Long accum1, Long accum2) {
        return accum1 + accum2;
    }

    // extract result from the accumulator (here the accumulator and result is the same)
    @Override
    public Long result(Long accum) {
        return accum;
    }
}

aggregateByKey and reduceByKey

These are similar to the aggregate and reduce operations but does the aggregation per key.

aggregateByKey aggregates the values for each key of the stream using the given Aggregator.

Stream<String> words = ...                                              // a windowed stream of words
Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1)     // convert to a stream of (word, 1) pairs
                                       .aggregateByKey(new Count<>());  // compute counts per word

reduceByKey performs a reduction on the values for each key of this stream by repeatedly applying the reducer.

Stream<String> words = ...                                              // a windowed stream of words
Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1)     // convert to a stream of (word, 1) pairs
                                       .reduceByKey((x, y) -> x + y);   // compute counts per word

Like the global aggregate/reduce, per-partition local aggregate (per key) is computed and the partial results are send to the target bolts where the partial results are merged to produce the final aggregate.

groupByKey

groupByKey on a stream of key-value pairs returns a new stream where the values are grouped by the keys.

// a stream of (user, score) pairs e.g. ("alice", 10), ("bob", 15), ("bob", 20), ("alice", 11), ("alice", 13)
PairStream<String, Double> scores = ... 

// list of scores per user in the last window, e.g. ("alice", [10, 11, 13]), ("bob", [15, 20])
PairStream<String, Iterable<Integer>> userScores =  scores.window(...).groupByKey(); 

countByKey

countByKey counts the values for each key of this stream.

Stream<String> words = ...                                              // a windowed stream of words
Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1)     // convert to a stream of (word, 1) pairs
                                       .countByKey();                   // compute counts per word

Internally countByKey uses aggregateByKey to compute the count.

Repartition

A repartition operation re-partitions the current stream and returns a new stream with the specified number of partitions. Further operations on resultant stream would execute at that level of parallelism. Re-partiton can be used to increase or reduce the parallelism of the operations in the stream.

The initial number of partitions can be also specified while creating the stream (via the StreamBuilder.newStream)

// Stream 's1' will have 2 partitions and operations on s1 will execute at this level of parallelism
Stream<String> s1 = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0), 2);

// Stream 's2' and further operations will have three partitions
Stream<String, Integer> s2 = s1.map(function1).repartition(3);

// perform a map operation on s2 and print the result
s2.map(function2).print();

Note: a repartition operation implies network transfer. In the above example the first map operation (function1) would be executed at a parallelism of 2 (on two partitions of s1), whereas the second map operation (function2) would be executed at a parallelism of 3 (on three partitions of s2). This also means that the first and second map operations has to be executed on two separate bolts and involves network transfer.

Output operations

Output operations push out the transformed values in the stream to the console, external sinks like databases, files or even Storm bolts.

print

print prints the values in the stream to console. For example,

// transforms words to uppercase and prints to the console
words.map(String::toUpperCase).print();

peek

peek returns a stream consisting of the elements of the stream, additionally performing the provided action on each element as they are consumed from the resulting stream. This can be used to ‘inspect’ the values flowing at any stage in a stream.

builder.newStream(...).flatMap(s -> Arrays.asList(s.split(" ")))
       // print the results of the flatMap operation as the values flow across the stream.
      .peek(s -> System.out.println(s))
      .mapToPair(w -> new Pair<>(w, 1))

forEach

This is the most generic output operation and can be used to execute an arbitrary code for each value in the stream, like storing the results into an external database, file and so on.

stream.forEach(value -> {
    // log it
    LOG.debug(value)
    // store the value into a db and so on...
    statement.executeUpdate(..);
  }
);

to

This allows one to plug in existing bolts as sinks.

// The redisBolt is a standard storm bolt
IRichBolt redisBolt = new RedisStoreBolt(poolConfig, storeMapper);
...
// generate the word counts and store it in redis using redis bolt
builder.newStream(new TestWordSpout(), new ValueMapper<String>(0))
       .mapToPair(w -> Pair.of(w, 1))
       .countByKey()
       // the (word, count) pairs are forwarded to the redisBolt which stores it in redis
       .to(redisBolt);

Note that this will provide guarantees only based on what the bolt provides.

Branch

A branch operation can be used to express If-then-else logic on streams.

Stream<T>[] streams  = stream.branch(Predicate<T>... predicates)

The predicates are applied in the given order to the values of the stream and the result is forwarded to the corresponding (index based) result stream based on the first predicate that matches. If none of the predicates match a value, that value is dropped.

For example,

Stream<Integer>[] streams = builder.newStream(new RandomIntegerSpout(), new ValueMapper<Integer>(0))
                                   .branch(x -> (x % 2) == 0, 
                                          x -> (x % 2) == 1);
Stream<Integer> evenNumbers = streams[0];
Stream<Integer> oddNumbers = streams[1];

Joins

A join operation joins the values of one stream with the values having the same key from another stream.

PairStream<Long, Long> squares =  // (1, 1), (2, 4), (3, 9) ...
PairStream<Long, Long> cubes =  // (1, 1), (2, 8), (3, 27) ...

// join the sqaures and cubes stream to produce (1, [1, 1]), (2, [4, 8]), (3, [9, 27]) ...
PairStream<Long, Pair<Long, Long>> joined = squares.window(TumblingWindows.of(Duration.seconds(5))).join(cubes);

Joins are typically invoked on a windowed stream, joining the key-values that arrived on each stream in the current window. The parallelism of the stream on which the join is invoked is carried forward to the joined stream. An optional ValueJoiner can be passed as an argument to join to specify how to join the two values for each matching key (the default behavior is to return a Pair of the value from both streams).

Left, right and full outer joins are supported.

CoGroupByKey

coGroupByKey Groups the values of this stream with the values having the same key from the other stream.

// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)
PairStream<String, String> stream1 = ...

// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3)
PairStream<String, String> stream2 = ...

// the co-grouped values per key in the last window, e.g. (k1, ([v1], [x1, x2]), (k2, ([v2, v3], [])), (k3, ([], [x3]))
PairStream<String, Iterable<String>> coGroupedStream =  stream1.window(...).coGroupByKey(stream2);

State

Storm provides APIs for applications to save and update the state of its computation and also to query the state.

updateStateByKey

updateStateByKey updates the state by applying a given state update function to the previous state and the new value for the key. updateStateByKey can be invoked with either an initial value for the state and a state update function or by directly providing a StateUpdater implementation.

PairStream<String, Long> wordCounts = ...
// Update the word counts in the state; here the first argument 0L is the initial value for the state and 
// the second argument is a function that adds the count to the current value in the state.
StreamState<String, Long> streamState = wordCounts.updateStateByKey(0L, (state, count) -> state + count)
streamState.toPairStream().print();

The state value can be of any type. In the above example its of type Long and stores the word count.

Internally storm uses stateful bolts for storing the state. The Storm config topology.state.provider can be used to choose the state provider implementation. For example set this to org.apache.storm.redis.state.RedisKeyValueStateProvider for redis based state store.

stateQuery

stateQuery can be used to query the state (updated by updateStateByKey). The StreamState returned by the updateStateByKey operation has to be used for querying stream state. The values in the stream are used as the keys to query the state.


// The stream of words emitted by the QuerySpout is used as the keys to query the state.
builder.newStream(new QuerySpout(), new ValueMapper<String>(0))
// Queries the state and emits the matching (key, value) as results. 
// The stream state returned by updateStateByKey is passed as the argument to stateQuery.
.stateQuery(streamState).print();

Guarantees

Right now the topologies built using Stream API provides at-least once guarantee.

Note that only the updateStateByKey operation currently executes on an underlying StatefulBolt. The other stateful operations (join, windowing, aggregation etc) executes on an IRichBolt and stores its state in memory. It relies on storms acking and replay mechanisms to rebuild the state.

In future the underlying framework of the Stream API would be enhanced to provide exactly once guarantees.

Example

Here’s a word count topology expressed using the Stream API,

StreamBuilder builder = new StreamBuilder();

builder
   // A stream of random sentences with two partitions
   .newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
   // a two seconds tumbling window
   .window(TumblingWindows.of(Duration.seconds(2)))
   // split the sentences to words
   .flatMap(s -> Arrays.asList(s.split(" ")))
   // create a stream of (word, 1) pairs
   .mapToPair(w -> Pair.of(w, 1))
   // compute the word counts in the last two second window
   .countByKey()
   // print the results to stdout
   .print();

The RandomSentenceSpout is a regular Storm spout that continuously emits random sentences. The stream of sentences are split into two second windows and the word count within each window is computed and printed.

The stream can then be submitted just like a regular topology as shown below.

  Config config = new Config();
  config.setNumWorkers(1);
  StormSubmitter.submitTopologyWithProgressBar("topology-name", config, builder.build());

More examples are available under storm-starter which will help you get started.