Package org.apache.storm.stats
Class StatsUtil
- java.lang.Object
-
- org.apache.storm.stats.StatsUtil
-
public class StatsUtil extends Object
-
-
Field Summary
Fields Modifier and Type Field Description static int
TEN_MIN_IN_SECONDS
static String
TEN_MIN_IN_SECONDS_STR
static String
TYPE
-
Constructor Summary
Constructors Constructor Description StatsUtil()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static Map<String,Object>
aggBoltExecWinStats(Map<String,Object> accStats, Map<String,Object> newStats, boolean includeSys)
aggregate windowed stats from a bolt executor stats with a Map of accumulated stats.static Map<String,Number>
aggBoltLatAndCount(Map<List<String>,Double> id2execAvg, Map<List<String>,Double> id2procAvg, Map<List<String>,Long> id2numExec)
Aggregates number executed, process latency, and execute latency across all streams.static <K> Map<K,Map>
aggBoltStreamsLatAndCount(Map<K,Double> id2execAvg, Map<K,Double> id2procAvg, Map<K,Long> id2numExec)
aggregate number executed and process & execute latencies.static ComponentPageInfo
aggCompExecsStats(Map exec2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId)
aggregate component executor stats.static Map<String,Object>
aggCompExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified.static Map<String,Object>
aggPreMergeCompPageBolt(Map<String,Object> beat, String window, boolean includeSys)
pre-merge component page bolt stats from an executor heartbeat 1.static Map<String,Object>
aggPreMergeCompPageSpout(Map<String,Object> beat, String window, boolean includeSys)
pre-merge component page spout stats from an executor heartbeat 1.static <K,V extends Number>
Map<String,Object>aggPreMergeTopoPageBolt(Map<String,Object> beat, String window, boolean includeSys)
pre-merge component stats of specified bolt id.static <K,V extends Number>
Map<String,Object>aggPreMergeTopoPageSpout(Map<String,Object> m, String window, boolean includeSys)
pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.static <K> Map<String,Map<K,Double>>
aggregateAverages(List<Map<String,Map<K,Double>>> avgSeq, List<Map<String,Map<K,Long>>> countSeq)
compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.static <K> Map<String,Double>
aggregateAvgStreams(Map<String,Map<K,Double>> avgs, Map<String,Map<K,Long>> counts)
aggregate weighted average of all streams.static <T> Map<String,Map>
aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys)
aggregate bolt stats.static Map<String,Map>
aggregateBoltStreams(Map<String,Map> stats)
aggregate all bolt streams.static <T> Map<String,Map<String,Map<T,Long>>>
aggregateCommonStats(List<ExecutorSummary> statsSeq)
aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.static Map<String,Object>
aggregateCompStats(String window, boolean includeSys, List<Map<String,Object>> beats, String compType)
Aggregate the stats for a component over a given window of time.static <T> Map<String,Map<T,Long>>
aggregateCounts(List<Map<String,Map<T,Long>>> countsSeq)
aggregate a list of count maps into one map.static <K,V extends Number>
Map<String,Long>aggregateCountStreams(Map<String,Map<K,V>> stats)
aggregate count streams by window.static Map<String,Map>
aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys)
aggregate spout stats.static Map<String,Map>
aggregateSpoutStreams(Map<String,Map> stats)
aggregate all spout streams.static Map<String,Object>
aggSpoutExecWinStats(Map<String,Object> accStats, Map<String,Object> beat, boolean includeSys)
aggregate windowed stats from a spout executor stats with a Map of accumulated stats.static Map<String,Number>
aggSpoutLatAndCount(Map<String,Double> id2compAvg, Map<String,Long> id2numAcked)
aggregate number acked and complete latencies across all streams.static <K> Map<K,Map>
aggSpoutStreamsLatAndCount(Map<K,Double> id2compAvg, Map<K,Long> id2acked)
Aggregates number acked and complete latencies.static TopologyPageInfo
aggTopoExecsStats(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState)
aggregate topo executors stats.static Map<String,Object>
aggTopoExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.static List<WorkerSummary>
aggWorkerStats(String stormId, String stormName, Map<Integer,String> task2Component, Map<List<Integer>,Map<String,Object>> beats, Map<List<Long>,List<Object>> exec2NodePort, Map<String,String> nodeHost, Map<WorkerSlot,WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor, String owner)
aggregate statistics per worker for a topology.static Map<String,Map>
boltStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.static String
componentType(StormTopology topology, String compId)
Get the coponenet type for a give id.static double
computeBoltCapacity(List<ExecutorSummary> executorSumms)
computes max bolt capacity.static double
computeExecutorCapacity(ExecutorSummary summary)
Compute the capacity of a executor.static Map<List<Integer>,Map<String,Object>>
convertExecutorBeats(Map<ExecutorInfo,ExecutorBeat> beats)
convert thrift executor heartbeats into a java HashMap.static Map<List<Integer>,ExecutorStats>
convertExecutorsStats(Map<ExecutorInfo,ExecutorStats> stats)
convert executors stats into a HashMap, note that ExecutorStats are remained unchanged.static Map<String,Object>
convertExecutorStats(ExecutorStats stats)
convert thrift ExecutorStats structure into a java HashMap.static Map<List<Integer>,Map<String,Object>>
convertWorkerBeats(SupervisorWorkerHeartbeat workerHeartbeat)
convertSupervisorWorkerHeartbeat
to nimbus local report executor heartbeats.static Map<String,Object>
convertZkExecutorHb(ExecutorBeat beat)
convert thrift ExecutorBeat into a java HashMap.static Map<String,Object>
convertZkWorkerHb(ClusterWorkerHeartbeat workerHb)
convert a thrift worker heartbeat into a java HashMap.static String
errorSubset(String errorStr)
static List<Map<String,Object>>
extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology)
extracts a list of executor data from heart beats.static List<Map<String,Object>>
extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology, String compId)
extracts a list of executor data from heart beats.static List<Map<String,Object>>
extractNodeInfosFromHbForComp(Map<List<? extends Number>,List<Object>> exec2hostPort, Map<Integer,String> task2component, boolean includeSys, String compId)
extract a list of host port info for specified component.static String
floatStr(Double n)
Convert a float to a string for display.static List<ExecutorSummary>
getFilledStats(List<ExecutorSummary> summs)
filter ExecutorSummary whose stats is null.static Map<String,Object>
mergeAggCompStatsCompPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
merge accumulated bolt stats with pre-merged component stats.static Map<String,Object>
mergeAggCompStatsCompPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
merge accumulated bolt stats with pre-merged component stats.static Map<String,Object>
mergeAggCompStatsTopoPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
merge accumulated bolt stats with new bolt stats.static Map<String,Object>
mergeAggCompStatsTopoPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
merge accumulated bolt stats with new bolt stats.static Map<String,Object>
postAggregateCompStats(Map<String,Object> compStats)
post aggregate component stats: 1.static <T> Map<String,Map<String,Map<T,Long>>>
preProcessStreamSummary(Map<String,Map<String,Map<T,Long>>> streamSummary, boolean includeSys)
filter system streams of aggregated spout/bolt stats if necessary.static Map<String,Map>
spoutStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.static ExecutorStats
thriftifyExecutorStats(Map stats)
Convert Executor stats to thrift data structure.static SupervisorWorkerHeartbeat
thriftifyRpcWorkerHb(String stormId, List<Long> executorId)
Used for local test.static <K> Map
windowSetConverter(Map stats, org.apache.storm.stats.ClientStatsUtil.KeyTransformer<K> firstKeyFunc)
-
-
-
Field Detail
-
TYPE
public static final String TYPE
- See Also:
- Constant Field Values
-
TEN_MIN_IN_SECONDS
public static final int TEN_MIN_IN_SECONDS
- See Also:
- Constant Field Values
-
TEN_MIN_IN_SECONDS_STR
public static final String TEN_MIN_IN_SECONDS_STR
- See Also:
- Constant Field Values
-
-
Method Detail
-
aggBoltLatAndCount
public static Map<String,Number> aggBoltLatAndCount(Map<List<String>,Double> id2execAvg, Map<List<String>,Double> id2procAvg, Map<List<String>,Long> id2numExec)
Aggregates number executed, process latency, and execute latency across all streams.- Parameters:
id2execAvg
- { global stream id -> exec avg value }, e.g., {["split" "default"] 0.44313}id2procAvg
- { global stream id -> proc avg value }id2numExec
- { global stream id -> executed }
-
aggSpoutLatAndCount
public static Map<String,Number> aggSpoutLatAndCount(Map<String,Double> id2compAvg, Map<String,Long> id2numAcked)
aggregate number acked and complete latencies across all streams.
-
aggBoltStreamsLatAndCount
public static <K> Map<K,Map> aggBoltStreamsLatAndCount(Map<K,Double> id2execAvg, Map<K,Double> id2procAvg, Map<K,Long> id2numExec)
aggregate number executed and process & execute latencies.
-
aggSpoutStreamsLatAndCount
public static <K> Map<K,Map> aggSpoutStreamsLatAndCount(Map<K,Double> id2compAvg, Map<K,Long> id2acked)
Aggregates number acked and complete latencies.
-
aggPreMergeCompPageBolt
public static Map<String,Object> aggPreMergeCompPageBolt(Map<String,Object> beat, String window, boolean includeSys)
pre-merge component page bolt stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary- Parameters:
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streams- Returns:
- per-merged stats
-
aggPreMergeCompPageSpout
public static Map<String,Object> aggPreMergeCompPageSpout(Map<String,Object> beat, String window, boolean includeSys)
pre-merge component page spout stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary- Parameters:
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streams- Returns:
- per-merged stats
-
aggPreMergeTopoPageBolt
public static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageBolt(Map<String,Object> beat, String window, boolean includeSys)
pre-merge component stats of specified bolt id.- Parameters:
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streams- Returns:
- { comp id -> comp-stats }
-
aggPreMergeTopoPageSpout
public static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageSpout(Map<String,Object> m, String window, boolean includeSys)
pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.
-
mergeAggCompStatsCompPageBolt
public static Map<String,Object> mergeAggCompStatsCompPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
merge accumulated bolt stats with pre-merged component stats.- Parameters:
accBoltStats
- accumulated bolt statsboltStats
- pre-merged component stats- Returns:
- merged stats
-
mergeAggCompStatsCompPageSpout
public static Map<String,Object> mergeAggCompStatsCompPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
merge accumulated bolt stats with pre-merged component stats.
-
mergeAggCompStatsTopoPageBolt
public static Map<String,Object> mergeAggCompStatsTopoPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
merge accumulated bolt stats with new bolt stats.- Parameters:
accBoltStats
- accumulated bolt statsboltStats
- new input bolt stats- Returns:
- merged bolt stats
-
mergeAggCompStatsTopoPageSpout
public static Map<String,Object> mergeAggCompStatsTopoPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
merge accumulated bolt stats with new bolt stats.
-
aggTopoExecStats
public static Map<String,Object> aggTopoExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.
-
aggTopoExecsStats
public static TopologyPageInfo aggTopoExecsStats(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState)
aggregate topo executors stats.- Parameters:
topologyId
- topology idexec2nodePort
- executor -> host+porttask2component
- task -> componentbeats
- executor[start, end] -> executor heartbeattopology
- storm topologywindow
- the window to be aggregatedincludeSys
- whether to include system streamsclusterState
- cluster state- Returns:
- TopologyPageInfo thrift structure
-
aggregateBoltStats
public static <T> Map<String,Map> aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys)
aggregate bolt stats.- Parameters:
statsSeq
- a seq of ExecutorStatsincludeSys
- whether to include system streams- Returns:
- aggregated bolt stats: {metric -> win -> global stream id -> value}
-
aggregateSpoutStats
public static Map<String,Map> aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys)
aggregate spout stats.- Parameters:
statsSeq
- a seq of ExecutorStatsincludeSys
- whether to include system streams- Returns:
- aggregated spout stats: {metric -> win -> global stream id -> value}
-
aggregateCommonStats
public static <T> Map<String,Map<String,Map<T,Long>>> aggregateCommonStats(List<ExecutorSummary> statsSeq)
aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.
-
preProcessStreamSummary
public static <T> Map<String,Map<String,Map<T,Long>>> preProcessStreamSummary(Map<String,Map<String,Map<T,Long>>> streamSummary, boolean includeSys)
filter system streams of aggregated spout/bolt stats if necessary.
-
aggregateCountStreams
public static <K,V extends Number> Map<String,Long> aggregateCountStreams(Map<String,Map<K,V>> stats)
aggregate count streams by window.- Parameters:
stats
- a Map of value: {win -> stream -> value}- Returns:
- a Map of value: {win -> value}
-
aggregateAverages
public static <K> Map<String,Map<K,Double>> aggregateAverages(List<Map<String,Map<K,Double>>> avgSeq, List<Map<String,Map<K,Long>>> countSeq)
compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.- Parameters:
avgSeq
- a list of {win -> global stream id -> avg value}countSeq
- a list of {win -> global stream id -> count value}- Returns:
- a Map of {win -> global stream id -> weighted avg value}
-
aggregateAvgStreams
public static <K> Map<String,Double> aggregateAvgStreams(Map<String,Map<K,Double>> avgs, Map<String,Map<K,Long>> counts)
aggregate weighted average of all streams.- Parameters:
avgs
- a Map of {win -> stream -> average value}counts
- a Map of {win -> stream -> count value}- Returns:
- a Map of {win -> aggregated value}
-
spoutStreamsStats
public static Map<String,Map> spoutStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.
-
boltStreamsStats
public static Map<String,Map> boltStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.
-
aggregateSpoutStreams
public static Map<String,Map> aggregateSpoutStreams(Map<String,Map> stats)
aggregate all spout streams.- Parameters:
stats
- a Map of {metric -> win -> stream id -> value}- Returns:
- a Map of {metric -> win -> aggregated value}
-
aggregateBoltStreams
public static Map<String,Map> aggregateBoltStreams(Map<String,Map> stats)
aggregate all bolt streams.- Parameters:
stats
- a Map of {metric -> win -> stream id -> value}- Returns:
- a Map of {metric -> win -> aggregated value}
-
aggBoltExecWinStats
public static Map<String,Object> aggBoltExecWinStats(Map<String,Object> accStats, Map<String,Object> newStats, boolean includeSys)
aggregate windowed stats from a bolt executor stats with a Map of accumulated stats.
-
aggSpoutExecWinStats
public static Map<String,Object> aggSpoutExecWinStats(Map<String,Object> accStats, Map<String,Object> beat, boolean includeSys)
aggregate windowed stats from a spout executor stats with a Map of accumulated stats.
-
aggregateCounts
public static <T> Map<String,Map<T,Long>> aggregateCounts(List<Map<String,Map<T,Long>>> countsSeq)
aggregate a list of count maps into one map.- Parameters:
countsSeq
- a seq of {win -> GlobalStreamId -> value}
-
aggregateCompStats
public static Map<String,Object> aggregateCompStats(String window, boolean includeSys, List<Map<String,Object>> beats, String compType)
Aggregate the stats for a component over a given window of time.
-
aggCompExecStats
public static Map<String,Object> aggCompExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified.
-
postAggregateCompStats
public static Map<String,Object> postAggregateCompStats(Map<String,Object> compStats)
post aggregate component stats: 1. computes execute-latency/process-latency from execute/process latency total 2. computes windowed weight avgs 3. transform Map keys- Parameters:
compStats
- accumulated comp stats
-
aggCompExecsStats
public static ComponentPageInfo aggCompExecsStats(Map exec2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId)
aggregate component executor stats.- Parameters:
exec2hostPort
- a Map of {executor -> host+port}task2component
- a Map of {task id -> component}beats
- a converted HashMap of executor heartbeats, {executor -> heartbeat}window
- specified windowincludeSys
- whether to include system streamstopologyId
- topology idtopology
- storm topologycomponentId
- component id- Returns:
- ComponentPageInfo thrift structure
-
aggWorkerStats
public static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, Map<Integer,String> task2Component, Map<List<Integer>,Map<String,Object>> beats, Map<List<Long>,List<Object>> exec2NodePort, Map<String,String> nodeHost, Map<WorkerSlot,WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor, String owner)
aggregate statistics per worker for a topology. Optionally filtering on specific supervisors- Parameters:
stormId
- topology idstormName
- storm topologytask2Component
- a Map of {task id -> component}beats
- a converted HashMap of executor heartbeats, {executor -> heartbeat}exec2NodePort
- a Map of {executor -> host+port}includeSys
- whether to include system streamsuserAuthorized
- whether the user is authorized to view topology infofilterSupervisor
- if not null, only return WorkerSummaries for that supervisorowner
- owner of the topology
-
convertExecutorBeats
public static Map<List<Integer>,Map<String,Object>> convertExecutorBeats(Map<ExecutorInfo,ExecutorBeat> beats)
convert thrift executor heartbeats into a java HashMap.
-
convertWorkerBeats
public static Map<List<Integer>,Map<String,Object>> convertWorkerBeats(SupervisorWorkerHeartbeat workerHeartbeat)
convertSupervisorWorkerHeartbeat
to nimbus local report executor heartbeats.
-
convertZkExecutorHb
public static Map<String,Object> convertZkExecutorHb(ExecutorBeat beat)
convert thrift ExecutorBeat into a java HashMap.
-
convertZkWorkerHb
public static Map<String,Object> convertZkWorkerHb(ClusterWorkerHeartbeat workerHb)
convert a thrift worker heartbeat into a java HashMap.
-
convertExecutorsStats
public static Map<List<Integer>,ExecutorStats> convertExecutorsStats(Map<ExecutorInfo,ExecutorStats> stats)
convert executors stats into a HashMap, note that ExecutorStats are remained unchanged.
-
convertExecutorStats
public static Map<String,Object> convertExecutorStats(ExecutorStats stats)
convert thrift ExecutorStats structure into a java HashMap.
-
extractNodeInfosFromHbForComp
public static List<Map<String,Object>> extractNodeInfosFromHbForComp(Map<List<? extends Number>,List<Object>> exec2hostPort, Map<Integer,String> task2component, boolean includeSys, String compId)
extract a list of host port info for specified component.- Parameters:
exec2hostPort
- {executor -> host+port}task2component
- {task id -> component}includeSys
- whether to include system streamscompId
- component id- Returns:
- a list of host+port
-
extractDataFromHb
public static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology)
extracts a list of executor data from heart beats.
-
extractDataFromHb
public static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology, String compId)
extracts a list of executor data from heart beats.
-
computeBoltCapacity
public static double computeBoltCapacity(List<ExecutorSummary> executorSumms)
computes max bolt capacity.- Parameters:
executorSumms
- a list of ExecutorSummary- Returns:
- max bolt capacity
-
computeExecutorCapacity
public static double computeExecutorCapacity(ExecutorSummary summary)
Compute the capacity of a executor. approximation of the % of time spent doing real work.- Parameters:
summary
- the stats for the executor.- Returns:
- the capacity of the executor.
-
getFilledStats
public static List<ExecutorSummary> getFilledStats(List<ExecutorSummary> summs)
filter ExecutorSummary whose stats is null.- Parameters:
summs
- a list of ExecutorSummary- Returns:
- filtered summs
-
thriftifyRpcWorkerHb
public static SupervisorWorkerHeartbeat thriftifyRpcWorkerHb(String stormId, List<Long> executorId)
Used for local test.
-
thriftifyExecutorStats
public static ExecutorStats thriftifyExecutorStats(Map stats)
Convert Executor stats to thrift data structure.- Parameters:
stats
- the stats in the form of a map.- Returns:
- teh thrift structure for the stats.
-
componentType
public static String componentType(StormTopology topology, String compId)
Get the coponenet type for a give id.- Parameters:
topology
- the topology this is a part of.compId
- the id of the component.- Returns:
- the type as a String "BOLT" or "SPOUT".
-
floatStr
public static String floatStr(Double n)
Convert a float to a string for display.- Parameters:
n
- the value to format.- Returns:
- the string ready for display.
-
-