public class StatsUtil extends Object
Modifier and Type | Field and Description |
---|---|
static int |
TEN_MIN_IN_SECONDS |
static String |
TEN_MIN_IN_SECONDS_STR |
static String |
TYPE |
Constructor and Description |
---|
StatsUtil() |
Modifier and Type | Method and Description |
---|---|
static Map<String,Object> |
aggBoltExecWinStats(Map<String,Object> accStats,
Map<String,Object> newStats,
boolean includeSys)
aggregate windowed stats from a bolt executor stats with a Map of accumulated stats.
|
static Map<String,Number> |
aggBoltLatAndCount(Map<List<String>,Double> id2execAvg,
Map<List<String>,Double> id2procAvg,
Map<List<String>,Long> id2numExec)
Aggregates number executed, process latency, and execute latency across all streams.
|
static <K> Map<K,Map> |
aggBoltStreamsLatAndCount(Map<K,Double> id2execAvg,
Map<K,Double> id2procAvg,
Map<K,Long> id2numExec)
aggregate number executed and process & execute latencies.
|
static ComponentPageInfo |
aggCompExecsStats(Map exec2hostPort,
Map task2component,
Map<List<Integer>,Map<String,Object>> beats,
String window,
boolean includeSys,
String topologyId,
StormTopology topology,
String componentId)
aggregate component executor stats.
|
static Map<String,Object> |
aggCompExecStats(String window,
boolean includeSys,
Map<String,Object> accStats,
Map<String,Object> beat,
String compType)
Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified.
|
static Map<String,Object> |
aggPreMergeCompPageBolt(Map<String,Object> beat,
String window,
boolean includeSys)
pre-merge component page bolt stats from an executor heartbeat 1.
|
static Map<String,Object> |
aggPreMergeCompPageSpout(Map<String,Object> beat,
String window,
boolean includeSys)
pre-merge component page spout stats from an executor heartbeat 1.
|
static <K,V extends Number> |
aggPreMergeTopoPageBolt(Map<String,Object> beat,
String window,
boolean includeSys)
pre-merge component stats of specified bolt id.
|
static <K,V extends Number> |
aggPreMergeTopoPageSpout(Map<String,Object> m,
String window,
boolean includeSys)
pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.
|
static <K> Map<String,Map<K,Double>> |
aggregateAverages(List<Map<String,Map<K,Double>>> avgSeq,
List<Map<String,Map<K,Long>>> countSeq)
compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.
|
static <K> Map<String,Double> |
aggregateAvgStreams(Map<String,Map<K,Double>> avgs,
Map<String,Map<K,Long>> counts)
aggregate weighted average of all streams.
|
static <T> Map<String,Map> |
aggregateBoltStats(List<ExecutorSummary> statsSeq,
boolean includeSys)
aggregate bolt stats.
|
static Map<String,Map> |
aggregateBoltStreams(Map<String,Map> stats)
aggregate all bolt streams.
|
static <T> Map<String,Map<String,Map<T,Long>>> |
aggregateCommonStats(List<ExecutorSummary> statsSeq)
aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.
|
static Map<String,Object> |
aggregateCompStats(String window,
boolean includeSys,
List<Map<String,Object>> beats,
String compType)
Aggregate the stats for a component over a given window of time.
|
static <T> Map<String,Map<T,Long>> |
aggregateCounts(List<Map<String,Map<T,Long>>> countsSeq)
aggregate a list of count maps into one map.
|
static <K,V extends Number> |
aggregateCountStreams(Map<String,Map<K,V>> stats)
aggregate count streams by window.
|
static Map<String,Map> |
aggregateSpoutStats(List<ExecutorSummary> statsSeq,
boolean includeSys)
aggregate spout stats.
|
static Map<String,Map> |
aggregateSpoutStreams(Map<String,Map> stats)
aggregate all spout streams.
|
static Map<String,Object> |
aggSpoutExecWinStats(Map<String,Object> accStats,
Map<String,Object> beat,
boolean includeSys)
aggregate windowed stats from a spout executor stats with a Map of accumulated stats.
|
static Map<String,Number> |
aggSpoutLatAndCount(Map<String,Double> id2compAvg,
Map<String,Long> id2numAcked)
aggregate number acked and complete latencies across all streams.
|
static <K> Map<K,Map> |
aggSpoutStreamsLatAndCount(Map<K,Double> id2compAvg,
Map<K,Long> id2acked)
Aggregates number acked and complete latencies.
|
static TopologyPageInfo |
aggTopoExecsStats(String topologyId,
Map exec2nodePort,
Map task2component,
Map<List<Integer>,Map<String,Object>> beats,
StormTopology topology,
String window,
boolean includeSys,
IStormClusterState clusterState)
aggregate topo executors stats.
|
static Map<String,Object> |
aggTopoExecStats(String window,
boolean includeSys,
Map<String,Object> accStats,
Map<String,Object> beat,
String compType)
A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.
|
static List<WorkerSummary> |
aggWorkerStats(String stormId,
String stormName,
Map<Integer,String> task2Component,
Map<List<Integer>,Map<String,Object>> beats,
Map<List<Long>,List<Object>> exec2NodePort,
Map<String,String> nodeHost,
Map<WorkerSlot,WorkerResources> worker2Resources,
boolean includeSys,
boolean userAuthorized,
String filterSupervisor,
String owner)
aggregate statistics per worker for a topology.
|
static Map<String,Map> |
boltStreamsStats(List<ExecutorSummary> summs,
boolean includeSys)
aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.
|
static String |
componentType(StormTopology topology,
String compId)
Get the coponenet type for a give id.
|
static double |
computeBoltCapacity(List<ExecutorSummary> executorSumms)
computes max bolt capacity.
|
static double |
computeExecutorCapacity(ExecutorSummary summary)
Compute the capacity of a executor.
|
static Map<List<Integer>,Map<String,Object>> |
convertExecutorBeats(Map<ExecutorInfo,ExecutorBeat> beats)
convert thrift executor heartbeats into a java HashMap.
|
static Map<List<Integer>,ExecutorStats> |
convertExecutorsStats(Map<ExecutorInfo,ExecutorStats> stats)
convert executors stats into a HashMap, note that ExecutorStats are remained unchanged.
|
static Map<String,Object> |
convertExecutorStats(ExecutorStats stats)
convert thrift ExecutorStats structure into a java HashMap.
|
static Map<List<Integer>,Map<String,Object>> |
convertWorkerBeats(SupervisorWorkerHeartbeat workerHeartbeat)
convert
SupervisorWorkerHeartbeat to nimbus local report executor heartbeats. |
static Map<String,Object> |
convertZkExecutorHb(ExecutorBeat beat)
convert thrift ExecutorBeat into a java HashMap.
|
static Map<String,Object> |
convertZkWorkerHb(ClusterWorkerHeartbeat workerHb)
convert a thrift worker heartbeat into a java HashMap.
|
static String |
errorSubset(String errorStr) |
static List<Map<String,Object>> |
extractDataFromHb(Map executor2hostPort,
Map task2component,
Map<List<Integer>,Map<String,Object>> beats,
boolean includeSys,
StormTopology topology)
extracts a list of executor data from heart beats.
|
static List<Map<String,Object>> |
extractDataFromHb(Map executor2hostPort,
Map task2component,
Map<List<Integer>,Map<String,Object>> beats,
boolean includeSys,
StormTopology topology,
String compId)
extracts a list of executor data from heart beats.
|
static List<Map<String,Object>> |
extractNodeInfosFromHbForComp(Map<List<? extends Number>,List<Object>> exec2hostPort,
Map<Integer,String> task2component,
boolean includeSys,
String compId)
extract a list of host port info for specified component.
|
static String |
floatStr(Double n)
Convert a float to a string for display.
|
static List<ExecutorSummary> |
getFilledStats(List<ExecutorSummary> summs)
filter ExecutorSummary whose stats is null.
|
static Map<String,Object> |
mergeAggCompStatsCompPageBolt(Map<String,Object> accBoltStats,
Map<String,Object> boltStats)
merge accumulated bolt stats with pre-merged component stats.
|
static Map<String,Object> |
mergeAggCompStatsCompPageSpout(Map<String,Object> accSpoutStats,
Map<String,Object> spoutStats)
merge accumulated bolt stats with pre-merged component stats.
|
static Map<String,Object> |
mergeAggCompStatsTopoPageBolt(Map<String,Object> accBoltStats,
Map<String,Object> boltStats)
merge accumulated bolt stats with new bolt stats.
|
static Map<String,Object> |
mergeAggCompStatsTopoPageSpout(Map<String,Object> accSpoutStats,
Map<String,Object> spoutStats)
merge accumulated bolt stats with new bolt stats.
|
static Map<String,Object> |
postAggregateCompStats(Map<String,Object> compStats)
post aggregate component stats: 1.
|
static <T> Map<String,Map<String,Map<T,Long>>> |
preProcessStreamSummary(Map<String,Map<String,Map<T,Long>>> streamSummary,
boolean includeSys)
filter system streams of aggregated spout/bolt stats if necessary.
|
static Map<String,Map> |
spoutStreamsStats(List<ExecutorSummary> summs,
boolean includeSys)
aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.
|
static ExecutorStats |
thriftifyExecutorStats(Map stats)
Convert Executor stats to thrift data structure.
|
static SupervisorWorkerHeartbeat |
thriftifyRpcWorkerHb(String stormId,
List<Long> executorId)
Used for local test.
|
static <K> Map |
windowSetConverter(Map stats,
org.apache.storm.stats.ClientStatsUtil.KeyTransformer<K> firstKeyFunc) |
public static final String TYPE
public static final int TEN_MIN_IN_SECONDS
public static final String TEN_MIN_IN_SECONDS_STR
public static Map<String,Number> aggBoltLatAndCount(Map<List<String>,Double> id2execAvg, Map<List<String>,Double> id2procAvg, Map<List<String>,Long> id2numExec)
Aggregates number executed, process latency, and execute latency across all streams.
id2execAvg
- { global stream id -> exec avg value }, e.g., {[“split” “default”] 0.44313}id2procAvg
- { global stream id -> proc avg value }id2numExec
- { global stream id -> executed }public static Map<String,Number> aggSpoutLatAndCount(Map<String,Double> id2compAvg, Map<String,Long> id2numAcked)
aggregate number acked and complete latencies across all streams.
public static <K> Map<K,Map> aggBoltStreamsLatAndCount(Map<K,Double> id2execAvg, Map<K,Double> id2procAvg, Map<K,Long> id2numExec)
aggregate number executed and process & execute latencies.
public static <K> Map<K,Map> aggSpoutStreamsLatAndCount(Map<K,Double> id2compAvg, Map<K,Long> id2acked)
Aggregates number acked and complete latencies.
public static Map<String,Object> aggPreMergeCompPageBolt(Map<String,Object> beat, String window, boolean includeSys)
pre-merge component page bolt stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streamspublic static Map<String,Object> aggPreMergeCompPageSpout(Map<String,Object> beat, String window, boolean includeSys)
pre-merge component page spout stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streamspublic static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageBolt(Map<String,Object> beat, String window, boolean includeSys)
pre-merge component stats of specified bolt id.
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streamspublic static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageSpout(Map<String,Object> m, String window, boolean includeSys)
pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.
public static Map<String,Object> mergeAggCompStatsCompPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
merge accumulated bolt stats with pre-merged component stats.
accBoltStats
- accumulated bolt statsboltStats
- pre-merged component statspublic static Map<String,Object> mergeAggCompStatsCompPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
merge accumulated bolt stats with pre-merged component stats.
public static Map<String,Object> mergeAggCompStatsTopoPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
merge accumulated bolt stats with new bolt stats.
accBoltStats
- accumulated bolt statsboltStats
- new input bolt statspublic static Map<String,Object> mergeAggCompStatsTopoPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
merge accumulated bolt stats with new bolt stats.
public static Map<String,Object> aggTopoExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.
public static TopologyPageInfo aggTopoExecsStats(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState)
aggregate topo executors stats.
topologyId
- topology idexec2nodePort
- executor -> host+porttask2component
- task -> componentbeats
- executor[start, end] -> executor heartbeattopology
- storm topologywindow
- the window to be aggregatedincludeSys
- whether to include system streamsclusterState
- cluster statepublic static <T> Map<String,Map> aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys)
aggregate bolt stats.
statsSeq
- a seq of ExecutorStatsincludeSys
- whether to include system streamspublic static Map<String,Map> aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys)
aggregate spout stats.
statsSeq
- a seq of ExecutorStatsincludeSys
- whether to include system streamspublic static <T> Map<String,Map<String,Map<T,Long>>> aggregateCommonStats(List<ExecutorSummary> statsSeq)
aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.
public static <T> Map<String,Map<String,Map<T,Long>>> preProcessStreamSummary(Map<String,Map<String,Map<T,Long>>> streamSummary, boolean includeSys)
filter system streams of aggregated spout/bolt stats if necessary.
public static <K,V extends Number> Map<String,Long> aggregateCountStreams(Map<String,Map<K,V>> stats)
aggregate count streams by window.
stats
- a Map of value: {win -> stream -> value}public static <K> Map<String,Map<K,Double>> aggregateAverages(List<Map<String,Map<K,Double>>> avgSeq, List<Map<String,Map<K,Long>>> countSeq)
compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.
avgSeq
- a list of {win -> global stream id -> avg value}countSeq
- a list of {win -> global stream id -> count value}public static <K> Map<String,Double> aggregateAvgStreams(Map<String,Map<K,Double>> avgs, Map<String,Map<K,Long>> counts)
aggregate weighted average of all streams.
avgs
- a Map of {win -> stream -> average value}counts
- a Map of {win -> stream -> count value}public static Map<String,Map> spoutStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.
public static Map<String,Map> boltStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.
public static Map<String,Map> aggregateSpoutStreams(Map<String,Map> stats)
aggregate all spout streams.
stats
- a Map of {metric -> win -> stream id -> value}public static Map<String,Map> aggregateBoltStreams(Map<String,Map> stats)
aggregate all bolt streams.
stats
- a Map of {metric -> win -> stream id -> value}public static Map<String,Object> aggBoltExecWinStats(Map<String,Object> accStats, Map<String,Object> newStats, boolean includeSys)
aggregate windowed stats from a bolt executor stats with a Map of accumulated stats.
public static Map<String,Object> aggSpoutExecWinStats(Map<String,Object> accStats, Map<String,Object> beat, boolean includeSys)
aggregate windowed stats from a spout executor stats with a Map of accumulated stats.
public static <T> Map<String,Map<T,Long>> aggregateCounts(List<Map<String,Map<T,Long>>> countsSeq)
aggregate a list of count maps into one map.
countsSeq
- a seq of {win -> GlobalStreamId -> value}public static Map<String,Object> aggregateCompStats(String window, boolean includeSys, List<Map<String,Object>> beats, String compType)
Aggregate the stats for a component over a given window of time.
public static Map<String,Object> aggCompExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified.
public static Map<String,Object> postAggregateCompStats(Map<String,Object> compStats)
post aggregate component stats: 1. computes execute-latency/process-latency from execute/process latency total 2. computes windowed weight avgs 3. transform Map keys
compStats
- accumulated comp statspublic static ComponentPageInfo aggCompExecsStats(Map exec2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId)
aggregate component executor stats.
exec2hostPort
- a Map of {executor -> host+port}task2component
- a Map of {task id -> component}beats
- a converted HashMap of executor heartbeats, {executor -> heartbeat}window
- specified windowincludeSys
- whether to include system streamstopologyId
- topology idtopology
- storm topologycomponentId
- component idpublic static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, Map<Integer,String> task2Component, Map<List<Integer>,Map<String,Object>> beats, Map<List<Long>,List<Object>> exec2NodePort, Map<String,String> nodeHost, Map<WorkerSlot,WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor, String owner)
aggregate statistics per worker for a topology. Optionally filtering on specific supervisors
stormId
- topology idstormName
- storm topologytask2Component
- a Map of {task id -> component}beats
- a converted HashMap of executor heartbeats, {executor -> heartbeat}exec2NodePort
- a Map of {executor -> host+port}includeSys
- whether to include system streamsuserAuthorized
- whether the user is authorized to view topology infofilterSupervisor
- if not null, only return WorkerSummaries for that supervisorowner
- owner of the topologypublic static Map<List<Integer>,Map<String,Object>> convertExecutorBeats(Map<ExecutorInfo,ExecutorBeat> beats)
convert thrift executor heartbeats into a java HashMap.
public static Map<List<Integer>,Map<String,Object>> convertWorkerBeats(SupervisorWorkerHeartbeat workerHeartbeat)
convert SupervisorWorkerHeartbeat
to nimbus local report executor heartbeats.
public static Map<String,Object> convertZkExecutorHb(ExecutorBeat beat)
convert thrift ExecutorBeat into a java HashMap.
public static Map<String,Object> convertZkWorkerHb(ClusterWorkerHeartbeat workerHb)
convert a thrift worker heartbeat into a java HashMap.
public static Map<List<Integer>,ExecutorStats> convertExecutorsStats(Map<ExecutorInfo,ExecutorStats> stats)
convert executors stats into a HashMap, note that ExecutorStats are remained unchanged.
public static Map<String,Object> convertExecutorStats(ExecutorStats stats)
convert thrift ExecutorStats structure into a java HashMap.
public static List<Map<String,Object>> extractNodeInfosFromHbForComp(Map<List<? extends Number>,List<Object>> exec2hostPort, Map<Integer,String> task2component, boolean includeSys, String compId)
extract a list of host port info for specified component.
exec2hostPort
- {executor -> host+port}task2component
- {task id -> component}includeSys
- whether to include system streamscompId
- component idpublic static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology)
extracts a list of executor data from heart beats.
public static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology, String compId)
extracts a list of executor data from heart beats.
public static double computeBoltCapacity(List<ExecutorSummary> executorSumms)
computes max bolt capacity.
executorSumms
- a list of ExecutorSummarypublic static double computeExecutorCapacity(ExecutorSummary summary)
Compute the capacity of a executor. approximation of the % of time spent doing real work.
summary
- the stats for the executor.public static List<ExecutorSummary> getFilledStats(List<ExecutorSummary> summs)
filter ExecutorSummary whose stats is null.
summs
- a list of ExecutorSummarypublic static SupervisorWorkerHeartbeat thriftifyRpcWorkerHb(String stormId, List<Long> executorId)
Used for local test.
public static ExecutorStats thriftifyExecutorStats(Map stats)
Convert Executor stats to thrift data structure.
stats
- the stats in the form of a map.public static String componentType(StormTopology topology, String compId)
Get the coponenet type for a give id.
topology
- the topology this is a part of.compId
- the id of the component.public static String floatStr(Double n)
Convert a float to a string for display.
n
- the value to format.Copyright © 2022 The Apache Software Foundation. All rights reserved.