Package org.apache.storm.windowing
Class WindowManager<T>
- java.lang.Object
-
- org.apache.storm.windowing.WindowManager<T>
-
- Type Parameters:
T
- the type of event in the window.
- All Implemented Interfaces:
TriggerHandler
- Direct Known Subclasses:
StatefulWindowManager
public class WindowManager<T> extends Object implements TriggerHandler
Tracks a window of events and firesWindowLifecycleListener
callbacks on expiry of events or activation of the window due toTriggerPolicy
.
-
-
Field Summary
Fields Modifier and Type Field Description protected EvictionPolicy<T,?>
evictionPolicy
static int
EXPIRE_EVENTS_THRESHOLD
Expire old events every EXPIRE_EVENTS_THRESHOLD to keep the window size in check.protected Collection<Event<T>>
queue
protected TriggerPolicy<T,?>
triggerPolicy
protected WindowLifecycleListener<T>
windowLifecycleListener
-
Constructor Summary
Constructors Constructor Description WindowManager(WindowLifecycleListener<T> lifecycleListener)
WindowManager(WindowLifecycleListener<T> lifecycleListener, Collection<Event<T>> queue)
Constructs aWindowManager
.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
add(Event<T> windowEvent)
Tracks a window event.void
add(T event)
Add an event into the window, withSystem.currentTimeMillis()
as the tracking ts.void
add(T event, long ts)
Add an event into the window, with the given ts as the tracking ts.protected void
compactWindow()
expires events that fall out of the window every EXPIRE_EVENTS_THRESHOLD so that the window does not grow too big.long
getEarliestEventTs(long startTs, long endTs)
Scans the event queue and returns the next earliest event ts between the startTs and endTs.int
getEventCount(long referenceTime)
Scans the event queue and returns number of events having timestamp less than or equal to the reference time.List<Long>
getSlidingCountTimestamps(long startTs, long endTs, int slidingCount)
Scans the event queue and returns the list of event ts falling between startTs (exclusive) and endTs (inclusive) at each sliding interval counts.Map<String,Optional<?>>
getState()
boolean
onTrigger()
The callback invoked by the trigger policy.void
restoreState(Map<String,Optional<?>> state)
void
setEvictionPolicy(EvictionPolicy<T,?> evictionPolicy)
void
setTriggerPolicy(TriggerPolicy<T,?> triggerPolicy)
void
shutdown()
String
toString()
-
-
-
Field Detail
-
EXPIRE_EVENTS_THRESHOLD
public static final int EXPIRE_EVENTS_THRESHOLD
Expire old events every EXPIRE_EVENTS_THRESHOLD to keep the window size in check.Note that if the eviction policy is based on watermarks, events will not be evicted until a new watermark would cause them to be considered expired anyway, regardless of this limit
- See Also:
- Constant Field Values
-
queue
protected final Collection<Event<T>> queue
-
windowLifecycleListener
protected final WindowLifecycleListener<T> windowLifecycleListener
-
evictionPolicy
protected EvictionPolicy<T,?> evictionPolicy
-
triggerPolicy
protected TriggerPolicy<T,?> triggerPolicy
-
-
Constructor Detail
-
WindowManager
public WindowManager(WindowLifecycleListener<T> lifecycleListener)
-
WindowManager
public WindowManager(WindowLifecycleListener<T> lifecycleListener, Collection<Event<T>> queue)
Constructs aWindowManager
.- Parameters:
lifecycleListener
- theWindowLifecycleListener
queue
- a collection where the events in the window can be enqueued.
Note: This collection has to be thread safe.
-
-
Method Detail
-
setEvictionPolicy
public void setEvictionPolicy(EvictionPolicy<T,?> evictionPolicy)
-
setTriggerPolicy
public void setTriggerPolicy(TriggerPolicy<T,?> triggerPolicy)
-
add
public void add(T event)
Add an event into the window, withSystem.currentTimeMillis()
as the tracking ts.- Parameters:
event
- the event to add
-
add
public void add(T event, long ts)
Add an event into the window, with the given ts as the tracking ts.- Parameters:
event
- the event to trackts
- the timestamp
-
add
public void add(Event<T> windowEvent)
Tracks a window event.- Parameters:
windowEvent
- the window event to track
-
onTrigger
public boolean onTrigger()
The callback invoked by the trigger policy.- Specified by:
onTrigger
in interfaceTriggerHandler
- Returns:
- true if the window was evaluated with at least one event in the window, false otherwise
-
shutdown
public void shutdown()
-
compactWindow
protected void compactWindow()
expires events that fall out of the window every EXPIRE_EVENTS_THRESHOLD so that the window does not grow too big.
-
getEarliestEventTs
public long getEarliestEventTs(long startTs, long endTs)
Scans the event queue and returns the next earliest event ts between the startTs and endTs.- Parameters:
startTs
- the start ts (exclusive)endTs
- the end ts (inclusive)- Returns:
- the earliest event ts between startTs and endTs
-
getEventCount
public int getEventCount(long referenceTime)
Scans the event queue and returns number of events having timestamp less than or equal to the reference time.- Parameters:
referenceTime
- the reference timestamp in millis- Returns:
- the count of events with timestamp less than or equal to referenceTime
-
getSlidingCountTimestamps
public List<Long> getSlidingCountTimestamps(long startTs, long endTs, int slidingCount)
Scans the event queue and returns the list of event ts falling between startTs (exclusive) and endTs (inclusive) at each sliding interval counts.- Parameters:
startTs
- the start timestamp (exclusive)endTs
- the end timestamp (inclusive)slidingCount
- the sliding interval count- Returns:
- the list of event ts
-
-