Version: 2.5.0

Joining Streams in Storm Core

Storm core supports joining multiple data streams into one with the help of JoinBolt. JoinBolt is a Windowed bolt, i.e. it waits for the configured window duration to match up the tuples among the streams being joined. This helps align the streams within a Window boundary.

Each of JoinBolt's incoming data streams must be Fields Grouped on a single field. A stream should only be joined with the other streams using the field on which it has been FieldsGrouped.
Knowing this will help understand the join syntax described below.

Performing Joins

Consider the following SQL join involving 4 tables:

select  userId, key4, key2, key3
from        table1
inner join  table2  on table2.userId =  table1.key1
inner join  table3  on table3.key3   =  table2.userId
left join   table4  on table4.key4   =  table3.key3

Similar joins could be expressed on tuples generated by 4 spouts using JoinBolt:

JoinBolt jbolt =  new JoinBolt("spout1", "key1")                   // from        spout1  
                    .join     ("spout2", "userId",  "spout1")      // inner join  spout2  on spout2.userId = spout1.key1
                    .join     ("spout3", "key3",    "spout2")      // inner join  spout3  on spout3.key3   = spout2.userId   
                    .leftJoin ("spout4", "key4",    "spout3")      // left join   spout4  on spout4.key4   = spout3.key3
                    .select  ("userId, key4, key2, spout3:key3")   // chose output fields
                    .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;

topoBuilder.setBolt("joiner", jbolt, 1)
            .fieldsGrouping("spout1", new Fields("key1") )
            .fieldsGrouping("spout2", new Fields("userId") )
            .fieldsGrouping("spout3", new Fields("key3") )
            .fieldsGrouping("spout4", new Fields("key4") );

The bolt constructor takes two arguments. The 1st argument introduces the data from spout1 to be the first stream and specifies that it will always use field key1 when joining this with the others streams. The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. Here data received from spout1 must be fields grouped on key1. Similarly, each of the leftJoin() and join() method calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which to join.

The select() method is used to specify the output fields. The argument to select is a comma separated list of fields. Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in multiple streams as follows: .select("spout3:key3, spout4:key3"). Nested tuple types are supported if the nesting has been done using Maps. For example outer.inner.innermost refers to a field that is nested three levels deep where outer and inner are of type Map.

Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported.

The call to withTumblingWindow() above, configures the join window to be a 10 minute tumbling window. Since JoinBolt is a Windowed Bolt, we can also use the withWindow method to configure it as a sliding window (see tips section below).

Stream names and Join order

  • Stream names must be introduced (in constructor or as 1st arg to various join methods) before being referred to (in the 3rd argument of the join methods). Forward referencing of stream names, as shown below, is not allowed:
new JoinBolt( "spout1", "key1")                 
  .join     ( "spout2", "userId",  "spout3") //not allowed. spout3 not yet introduced
  .join     ( "spout3", "key3",    "spout1")
  • Internally, the joins will be performed in the order expressed by the user.

Joining based on Stream names

For simplicity, Storm topologies often use the default stream. Topologies can also use named streams instead of default streams. To support such topologies, JoinBolt can be configured to use stream names, instead of source component (spout/bolt) names, via the constructor's first argument:

new JoinBolt(JoinBolt.Selector.STREAM,  "stream1", "key1")
                                  .join("stream2", "key2")
    ...

The first argument JoinBolt.Selector.STREAM informs the bolt that stream1/2/3/4 refer to named streams (as opposed to names of upstream spouts/bolts).

The below example joins two named streams from four spouts:

new JoinBolt(JoinBolt.Selector.STREAM,  "stream1", "key1") 
                             .join     ("stream2", "userId",  "stream1" )
                             .select ("userId, key1, key2")
                             .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;

topoBuilder.setBolt("joiner", jbolt, 1)
            .fieldsGrouping("bolt1", "stream1", new Fields("key1") )
            .fieldsGrouping("bolt2", "stream1", new Fields("key1") )
            .fieldsGrouping("bolt3", "stream2", new Fields("userId") )
            .fieldsGrouping("bolt4", "stream1", new Fields("key1") );

In the above example, it is possible that bolt1, for example, is emitting other streams also. But the join bolt is only subscribing to stream1 & stream2 from the different bolts. stream1 from bolt1, bolt2 and bolt4 is treated as a single stream and joined against stream2 from bolt3.

Limitations:

  1. Currently only INNER and LEFT joins are supported.

  2. Unlike SQL, which allows joining the same table on different keys to different tables, here the same one field must be used on a stream. Fields Grouping ensures the right tuples are routed to the right instances of a Join Bolt. Consequently the FieldsGrouping field must be same as the join field, for correct results. To perform joins on multiple fields, the fields can be combined into one field and then sent to the Join bolt.

Tips:

  1. Joins can be CPU and memory intensive. The larger the data accumulated in the current window (proportional to window length), the longer it takes to do the join. Having a short sliding interval (few seconds for example) triggers frequent joins. Consequently performance can suffer if using large window lengths or small sliding intervals or both.

  2. Duplication of joined records across windows is possible when using Sliding Windows. This is because the tuples continue to exist across multiple windows when using Sliding Windows.

  3. If message timeouts are enabled, ensure the timeout setting (topology.message.timeout.secs) is large enough to comfortably accommodate the window size, plus the additional processing by other spouts and bolts.

  4. Joining a window of two streams with M and N elements each, in the worst case, can produce MxN elements with every output tuple anchored to one tuple from each input stream. This can mean a lot of output tuples from JoinBolt and even more ACKs for downstream bolts to emit. This can place a substantial pressure on the messaging system and dramatically slowdown the topology if not careful. To manage the load on the messaging subsystem, it is advisable to:

    • Increase the worker's heap (topology.worker.max.heap.size.mb).
    • If ACKing is not necessary for your topology, disable ACKers (topology.acker.executors=0).
    • Disable event logger (topology.eventlogger.executors=0).
    • Turn of topology debugging (topology.debug=false).
    • Set topology.max.spout.pending to a value large enough to accommodate an estimated full window worth of tuples plus some more for headroom. This helps mitigate the possibility of spouts emitting excessive tuples when messaging subsystem is experiencing excessive load. This situation can occur when its value is set to null.
    • Lastly, keep the window size to the minimum value necessary for solving the problem at hand.