Package org.apache.storm.bolt
Class JoinBolt
- java.lang.Object
-
- org.apache.storm.topology.base.BaseWindowedBolt
-
- org.apache.storm.bolt.JoinBolt
-
- All Implemented Interfaces:
Serializable
,IComponent
,IWindowedBolt
public class JoinBolt extends BaseWindowedBolt
- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static class
JoinBolt.FieldSelector
protected class
JoinBolt.JoinAccumulator
protected static class
JoinBolt.JoinInfo
Describes how to join the other stream with the current stream.protected static class
JoinBolt.JoinType
protected class
JoinBolt.ResultRecord
static class
JoinBolt.Selector
-
Nested classes/interfaces inherited from class org.apache.storm.topology.base.BaseWindowedBolt
BaseWindowedBolt.Count, BaseWindowedBolt.Duration
-
-
Field Summary
Fields Modifier and Type Field Description protected LinkedHashMap<String,JoinBolt.JoinInfo>
joinCriteria
protected JoinBolt.FieldSelector[]
outputFields
protected String
outputStreamName
protected JoinBolt.Selector
selectorType
-
Fields inherited from class org.apache.storm.topology.base.BaseWindowedBolt
timestampExtractor, windowConfiguration
-
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
declareOutputFields(OutputFieldsDeclarer declarer)
Declare the output schema for all the streams of this topology.protected JoinBolt.JoinAccumulator
doInnerJoin(JoinBolt.JoinAccumulator probe, Map<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)
protected JoinBolt.JoinAccumulator
doJoin(JoinBolt.JoinAccumulator probe, HashMap<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)
protected JoinBolt.JoinAccumulator
doLeftJoin(JoinBolt.JoinAccumulator probe, Map<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)
protected ArrayList<Object>
doProjection(ArrayList<Tuple> tuples, JoinBolt.FieldSelector[] projectionFields)
void
execute(TupleWindow inputWindow)
Process the tuple window and optionally emit new tuples based on the tuples in the input window.protected JoinBolt.JoinAccumulator
hashJoin(List<Tuple> tuples)
JoinBolt
join(String newStream, String field, String priorStream)
Performs inner Join with the newStream.JoinBolt
leftJoin(String newStream, String field, String priorStream)
Performs left Join with the newStream.protected Object
lookupField(JoinBolt.FieldSelector fieldSelector, Tuple tuple)
void
prepare(Map<String,Object> topoConf, TopologyContext context, OutputCollector collector)
This is similar to theIBolt.prepare(Map, TopologyContext, OutputCollector)
except that while emitting, the tuples are automatically anchored to the tuples in the inputWindow.JoinBolt
select(String commaSeparatedKeys)
Specify projection fields.BaseWindowedBolt
withLag(BaseWindowedBolt.Duration duration)
Specify the maximum time lag of the tuple timestamp in milliseconds.JoinBolt
withLateTupleStream(String streamId)
Specify a stream id on which late tuples are going to be emitted.JoinBolt
withOutputStream(String streamName)
Optional.JoinBolt
withTimestampExtractor(TimestampExtractor timestampExtractor)
Specify the timestamp extractor implementation.JoinBolt
withTimestampField(String fieldName)
Specify a field in the tuple that represents the timestamp as a long value.JoinBolt
withTumblingWindow(BaseWindowedBolt.Count count)
A count based tumbling window.JoinBolt
withTumblingWindow(BaseWindowedBolt.Duration duration)
A time duration based tumbling window.BaseWindowedBolt
withWatermarkInterval(BaseWindowedBolt.Duration interval)
Specify the watermark event generation interval.JoinBolt
withWindow(BaseWindowedBolt.Count windowLength)
A tuple count based window that slides with every incoming tuple.JoinBolt
withWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Count slidingInterval)
Tuple count based sliding window configuration.JoinBolt
withWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Duration slidingInterval)
Tuple count and time duration based sliding window configuration.JoinBolt
withWindow(BaseWindowedBolt.Duration windowLength)
A time duration based window that slides with every incoming tuple.JoinBolt
withWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Count slidingInterval)
Time duration and count based sliding window configuration.JoinBolt
withWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Duration slidingInterval)
Time duration based sliding window configuration.-
Methods inherited from class org.apache.storm.topology.base.BaseWindowedBolt
cleanup, getComponentConfiguration, getTimestampExtractor
-
-
-
-
Field Detail
-
selectorType
protected final JoinBolt.Selector selectorType
-
joinCriteria
protected LinkedHashMap<String,JoinBolt.JoinInfo> joinCriteria
-
outputFields
protected JoinBolt.FieldSelector[] outputFields
-
outputStreamName
protected String outputStreamName
-
-
Constructor Detail
-
JoinBolt
public JoinBolt(String sourceId, String fieldName)
Calls JoinBolt(Selector.SOURCE, sourceId, fieldName)- Parameters:
sourceId
- Id of source component (spout/bolt) from which this bolt is receiving datafieldName
- the field to use for joining the stream (x.y.z format)
-
JoinBolt
public JoinBolt(JoinBolt.Selector type, String srcOrStreamId, String fieldName)
Introduces the first stream to start the join with. Equivalent SQL ... select .... from srcOrStreamId ...- Parameters:
type
- Specifies whether 'srcOrStreamId' refers to stream name/source componentsrcOrStreamId
- name of stream OR source componentfieldName
- the field to use for joining the stream (x.y.z format)
-
-
Method Detail
-
withOutputStream
public JoinBolt withOutputStream(String streamName)
Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on 'default' stream.
-
join
public JoinBolt join(String newStream, String field, String priorStream)
Performs inner Join with the newStream. SQL:from priorStream inner join newStream on newStream.field = priorStream.field1
same as:new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);
Note: priorStream must be previously joined. Valid ex:new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
Invalid ex:new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
- Parameters:
newStream
- Either stream name or name of upstream componentfield
- the field on which to perform the join
-
leftJoin
public JoinBolt leftJoin(String newStream, String field, String priorStream)
Performs left Join with the newStream. SQL : from stream1 left join stream2 on stream2.field = stream1.field1 same as: new WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);Note: priorStream must be previously joined Valid ex: new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2); Invalid ex: new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
- Parameters:
newStream
- Either a name of a stream or an upstream componentfield
- the field on which to perform the join
-
select
public JoinBolt select(String commaSeparatedKeys)
Specify projection fields. i.e. Specifies the fields to include in the output. e.g: .select("field1, stream2:field2, field3") Nested Key names are supported for nested types: e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)" Inner types (non leaf) must be Map<> in order to support nested lookup using this dot notation This selected fields implicitly declare the output fieldNames for the bolt based.
-
declareOutputFields
public void declareOutputFields(OutputFieldsDeclarer declarer)
Description copied from interface:IComponent
Declare the output schema for all the streams of this topology.- Specified by:
declareOutputFields
in interfaceIComponent
- Overrides:
declareOutputFields
in classBaseWindowedBolt
- Parameters:
declarer
- this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
-
prepare
public void prepare(Map<String,Object> topoConf, TopologyContext context, OutputCollector collector)
Description copied from interface:IWindowedBolt
This is similar to theIBolt.prepare(Map, TopologyContext, OutputCollector)
except that while emitting, the tuples are automatically anchored to the tuples in the inputWindow.- Specified by:
prepare
in interfaceIWindowedBolt
- Overrides:
prepare
in classBaseWindowedBolt
-
execute
public void execute(TupleWindow inputWindow)
Description copied from interface:IWindowedBolt
Process the tuple window and optionally emit new tuples based on the tuples in the input window.
-
hashJoin
protected JoinBolt.JoinAccumulator hashJoin(List<Tuple> tuples)
-
doJoin
protected JoinBolt.JoinAccumulator doJoin(JoinBolt.JoinAccumulator probe, HashMap<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)
-
doInnerJoin
protected JoinBolt.JoinAccumulator doInnerJoin(JoinBolt.JoinAccumulator probe, Map<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)
-
doLeftJoin
protected JoinBolt.JoinAccumulator doLeftJoin(JoinBolt.JoinAccumulator probe, Map<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)
-
doProjection
protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, JoinBolt.FieldSelector[] projectionFields)
-
lookupField
protected Object lookupField(JoinBolt.FieldSelector fieldSelector, Tuple tuple)
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Count slidingInterval)
Description copied from class:BaseWindowedBolt
Tuple count based sliding window configuration.- Overrides:
withWindow
in classBaseWindowedBolt
- Parameters:
windowLength
- the number of tuples in the windowslidingInterval
- the number of tuples after which the window slides
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Duration slidingInterval)
Description copied from class:BaseWindowedBolt
Tuple count and time duration based sliding window configuration.- Overrides:
withWindow
in classBaseWindowedBolt
- Parameters:
windowLength
- the number of tuples in the windowslidingInterval
- the time duration after which the window slides
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Count slidingInterval)
Description copied from class:BaseWindowedBolt
Time duration and count based sliding window configuration.- Overrides:
withWindow
in classBaseWindowedBolt
- Parameters:
windowLength
- the time duration of the windowslidingInterval
- the number of tuples after which the window slides
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Duration slidingInterval)
Description copied from class:BaseWindowedBolt
Time duration based sliding window configuration.- Overrides:
withWindow
in classBaseWindowedBolt
- Parameters:
windowLength
- the time duration of the windowslidingInterval
- the time duration after which the window slides
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Count windowLength)
Description copied from class:BaseWindowedBolt
A tuple count based window that slides with every incoming tuple.- Overrides:
withWindow
in classBaseWindowedBolt
- Parameters:
windowLength
- the number of tuples in the window
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Duration windowLength)
Description copied from class:BaseWindowedBolt
A time duration based window that slides with every incoming tuple.- Overrides:
withWindow
in classBaseWindowedBolt
- Parameters:
windowLength
- the time duration of the window
-
withTumblingWindow
public JoinBolt withTumblingWindow(BaseWindowedBolt.Count count)
Description copied from class:BaseWindowedBolt
A count based tumbling window.- Overrides:
withTumblingWindow
in classBaseWindowedBolt
- Parameters:
count
- the number of tuples after which the window tumbles
-
withTumblingWindow
public JoinBolt withTumblingWindow(BaseWindowedBolt.Duration duration)
Description copied from class:BaseWindowedBolt
A time duration based tumbling window.- Overrides:
withTumblingWindow
in classBaseWindowedBolt
- Parameters:
duration
- the time duration after which the window tumbles
-
withTimestampField
public JoinBolt withTimestampField(String fieldName)
Description copied from class:BaseWindowedBolt
Specify a field in the tuple that represents the timestamp as a long value. If this field is not present in the incoming tuple, anIllegalArgumentException
will be thrown. The field MUST contain a timestamp in milliseconds- Overrides:
withTimestampField
in classBaseWindowedBolt
- Parameters:
fieldName
- the name of the field that contains the timestamp
-
withTimestampExtractor
public JoinBolt withTimestampExtractor(TimestampExtractor timestampExtractor)
Description copied from class:BaseWindowedBolt
Specify the timestamp extractor implementation.- Overrides:
withTimestampExtractor
in classBaseWindowedBolt
- Parameters:
timestampExtractor
- theTimestampExtractor
implementation
-
withLateTupleStream
public JoinBolt withLateTupleStream(String streamId)
Description copied from class:BaseWindowedBolt
Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via theWindowedBoltExecutor.LATE_TUPLE_FIELD
field. It must be defined on a per-component basis, and in conjunction with theBaseWindowedBolt.withTimestampField(java.lang.String)
, otherwiseIllegalArgumentException
will be thrown.- Overrides:
withLateTupleStream
in classBaseWindowedBolt
- Parameters:
streamId
- the name of the stream used to emit late tuples on
-
withLag
public BaseWindowedBolt withLag(BaseWindowedBolt.Duration duration)
Description copied from class:BaseWindowedBolt
Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount.- Overrides:
withLag
in classBaseWindowedBolt
- Parameters:
duration
- the max lag duration
-
withWatermarkInterval
public BaseWindowedBolt withWatermarkInterval(BaseWindowedBolt.Duration interval)
Description copied from class:BaseWindowedBolt
Specify the watermark event generation interval. For tuple based timestamps, watermark events are used to track the progress of time- Overrides:
withWatermarkInterval
in classBaseWindowedBolt
- Parameters:
interval
- the interval at which watermark events are generated
-
-