Class StatsUtil


  • public class StatsUtil
    extends Object
    • Constructor Detail

      • StatsUtil

        public StatsUtil()
    • Method Detail

      • aggBoltLatAndCount

        public static Map<String,​Number> aggBoltLatAndCount​(Map<List<String>,​Double> id2execAvg,
                                                                  Map<List<String>,​Double> id2procAvg,
                                                                  Map<List<String>,​Long> id2numExec)
        Aggregates number executed, process latency, and execute latency across all streams.
        Parameters:
        id2execAvg - { global stream id -> exec avg value }, e.g., {["split" "default"] 0.44313}
        id2procAvg - { global stream id -> proc avg value }
        id2numExec - { global stream id -> executed }
      • aggSpoutLatAndCount

        public static Map<String,​Number> aggSpoutLatAndCount​(Map<String,​Double> id2compAvg,
                                                                   Map<String,​Long> id2numAcked)
        aggregate number acked and complete latencies across all streams.
      • aggBoltStreamsLatAndCount

        public static <K> Map<K,​Map> aggBoltStreamsLatAndCount​(Map<K,​Double> id2execAvg,
                                                                     Map<K,​Double> id2procAvg,
                                                                     Map<K,​Long> id2numExec)
        aggregate number executed and process & execute latencies.
      • aggSpoutStreamsLatAndCount

        public static <K> Map<K,​Map> aggSpoutStreamsLatAndCount​(Map<K,​Double> id2compAvg,
                                                                      Map<K,​Long> id2acked)
        Aggregates number acked and complete latencies.
      • aggPreMergeCompPageBolt

        public static Map<String,​Object> aggPreMergeCompPageBolt​(Map<String,​Object> beat,
                                                                       String window,
                                                                       boolean includeSys)
        pre-merge component page bolt stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary
        Parameters:
        beat - executor heartbeat data
        window - specified window
        includeSys - whether to include system streams
        Returns:
        per-merged stats
      • aggPreMergeCompPageSpout

        public static Map<String,​Object> aggPreMergeCompPageSpout​(Map<String,​Object> beat,
                                                                        String window,
                                                                        boolean includeSys)
        pre-merge component page spout stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary
        Parameters:
        beat - executor heartbeat data
        window - specified window
        includeSys - whether to include system streams
        Returns:
        per-merged stats
      • aggPreMergeTopoPageBolt

        public static <K,​V extends NumberMap<String,​Object> aggPreMergeTopoPageBolt​(Map<String,​Object> beat,
                                                                                                  String window,
                                                                                                  boolean includeSys)
        pre-merge component stats of specified bolt id.
        Parameters:
        beat - executor heartbeat data
        window - specified window
        includeSys - whether to include system streams
        Returns:
        { comp id -> comp-stats }
      • aggPreMergeTopoPageSpout

        public static <K,​V extends NumberMap<String,​Object> aggPreMergeTopoPageSpout​(Map<String,​Object> m,
                                                                                                   String window,
                                                                                                   boolean includeSys)
        pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.
      • mergeAggCompStatsCompPageBolt

        public static Map<String,​Object> mergeAggCompStatsCompPageBolt​(Map<String,​Object> accBoltStats,
                                                                             Map<String,​Object> boltStats)
        merge accumulated bolt stats with pre-merged component stats.
        Parameters:
        accBoltStats - accumulated bolt stats
        boltStats - pre-merged component stats
        Returns:
        merged stats
      • mergeAggCompStatsCompPageSpout

        public static Map<String,​Object> mergeAggCompStatsCompPageSpout​(Map<String,​Object> accSpoutStats,
                                                                              Map<String,​Object> spoutStats)
        merge accumulated bolt stats with pre-merged component stats.
      • mergeAggCompStatsTopoPageBolt

        public static Map<String,​Object> mergeAggCompStatsTopoPageBolt​(Map<String,​Object> accBoltStats,
                                                                             Map<String,​Object> boltStats)
        merge accumulated bolt stats with new bolt stats.
        Parameters:
        accBoltStats - accumulated bolt stats
        boltStats - new input bolt stats
        Returns:
        merged bolt stats
      • mergeAggCompStatsTopoPageSpout

        public static Map<String,​Object> mergeAggCompStatsTopoPageSpout​(Map<String,​Object> accSpoutStats,
                                                                              Map<String,​Object> spoutStats)
        merge accumulated bolt stats with new bolt stats.
      • aggTopoExecStats

        public static Map<String,​Object> aggTopoExecStats​(String window,
                                                                boolean includeSys,
                                                                Map<String,​Object> accStats,
                                                                Map<String,​Object> beat,
                                                                String compType)
        A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.
      • aggTopoExecsStats

        public static TopologyPageInfo aggTopoExecsStats​(String topologyId,
                                                         Map exec2nodePort,
                                                         Map task2component,
                                                         Map<List<Integer>,​Map<String,​Object>> beats,
                                                         StormTopology topology,
                                                         String window,
                                                         boolean includeSys,
                                                         IStormClusterState clusterState)
        aggregate topo executors stats.
        Parameters:
        topologyId - topology id
        exec2nodePort - executor -> host+port
        task2component - task -> component
        beats - executor[start, end] -> executor heartbeat
        topology - storm topology
        window - the window to be aggregated
        includeSys - whether to include system streams
        clusterState - cluster state
        Returns:
        TopologyPageInfo thrift structure
      • aggregateBoltStats

        public static <T> Map<String,​Map> aggregateBoltStats​(List<ExecutorSummary> statsSeq,
                                                                   boolean includeSys)
        aggregate bolt stats.
        Parameters:
        statsSeq - a seq of ExecutorStats
        includeSys - whether to include system streams
        Returns:
        aggregated bolt stats: {metric -> win -> global stream id -> value}
      • aggregateSpoutStats

        public static Map<String,​Map> aggregateSpoutStats​(List<ExecutorSummary> statsSeq,
                                                                boolean includeSys)
        aggregate spout stats.
        Parameters:
        statsSeq - a seq of ExecutorStats
        includeSys - whether to include system streams
        Returns:
        aggregated spout stats: {metric -> win -> global stream id -> value}
      • aggregateCommonStats

        public static <T> Map<String,​Map<String,​Map<T,​Long>>> aggregateCommonStats​(List<ExecutorSummary> statsSeq)
        aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.
      • preProcessStreamSummary

        public static <T> Map<String,​Map<String,​Map<T,​Long>>> preProcessStreamSummary​(Map<String,​Map<String,​Map<T,​Long>>> streamSummary,
                                                                                                        boolean includeSys)
        filter system streams of aggregated spout/bolt stats if necessary.
      • aggregateCountStreams

        public static <K,​V extends NumberMap<String,​Long> aggregateCountStreams​(Map<String,​Map<K,​V>> stats)
        aggregate count streams by window.
        Parameters:
        stats - a Map of value: {win -> stream -> value}
        Returns:
        a Map of value: {win -> value}
      • aggregateAverages

        public static <K> Map<String,​Map<K,​Double>> aggregateAverages​(List<Map<String,​Map<K,​Double>>> avgSeq,
                                                                                  List<Map<String,​Map<K,​Long>>> countSeq)
        compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.
        Parameters:
        avgSeq - a list of {win -> global stream id -> avg value}
        countSeq - a list of {win -> global stream id -> count value}
        Returns:
        a Map of {win -> global stream id -> weighted avg value}
      • aggregateAvgStreams

        public static <K> Map<String,​Double> aggregateAvgStreams​(Map<String,​Map<K,​Double>> avgs,
                                                                       Map<String,​Map<K,​Long>> counts)
        aggregate weighted average of all streams.
        Parameters:
        avgs - a Map of {win -> stream -> average value}
        counts - a Map of {win -> stream -> count value}
        Returns:
        a Map of {win -> aggregated value}
      • spoutStreamsStats

        public static Map<String,​Map> spoutStreamsStats​(List<ExecutorSummary> summs,
                                                              boolean includeSys)
        aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.
      • boltStreamsStats

        public static Map<String,​Map> boltStreamsStats​(List<ExecutorSummary> summs,
                                                             boolean includeSys)
        aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.
      • aggregateSpoutStreams

        public static Map<String,​Map> aggregateSpoutStreams​(Map<String,​Map> stats)
        aggregate all spout streams.
        Parameters:
        stats - a Map of {metric -> win -> stream id -> value}
        Returns:
        a Map of {metric -> win -> aggregated value}
      • aggregateBoltStreams

        public static Map<String,​Map> aggregateBoltStreams​(Map<String,​Map> stats)
        aggregate all bolt streams.
        Parameters:
        stats - a Map of {metric -> win -> stream id -> value}
        Returns:
        a Map of {metric -> win -> aggregated value}
      • aggBoltExecWinStats

        public static Map<String,​Object> aggBoltExecWinStats​(Map<String,​Object> accStats,
                                                                   Map<String,​Object> newStats,
                                                                   boolean includeSys)
        aggregate windowed stats from a bolt executor stats with a Map of accumulated stats.
      • aggSpoutExecWinStats

        public static Map<String,​Object> aggSpoutExecWinStats​(Map<String,​Object> accStats,
                                                                    Map<String,​Object> beat,
                                                                    boolean includeSys)
        aggregate windowed stats from a spout executor stats with a Map of accumulated stats.
      • aggregateCounts

        public static <T> Map<String,​Map<T,​Long>> aggregateCounts​(List<Map<String,​Map<T,​Long>>> countsSeq)
        aggregate a list of count maps into one map.
        Parameters:
        countsSeq - a seq of {win -> GlobalStreamId -> value}
      • aggregateCompStats

        public static Map<String,​Object> aggregateCompStats​(String window,
                                                                  boolean includeSys,
                                                                  List<Map<String,​Object>> beats,
                                                                  String compType)
        Aggregate the stats for a component over a given window of time.
      • aggCompExecStats

        public static Map<String,​Object> aggCompExecStats​(String window,
                                                                boolean includeSys,
                                                                Map<String,​Object> accStats,
                                                                Map<String,​Object> beat,
                                                                String compType)
        Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified.
      • postAggregateCompStats

        public static Map<String,​Object> postAggregateCompStats​(Map<String,​Object> compStats)
        post aggregate component stats: 1. computes execute-latency/process-latency from execute/process latency total 2. computes windowed weight avgs 3. transform Map keys
        Parameters:
        compStats - accumulated comp stats
      • aggCompExecsStats

        public static ComponentPageInfo aggCompExecsStats​(Map exec2hostPort,
                                                          Map task2component,
                                                          Map<List<Integer>,​Map<String,​Object>> beats,
                                                          String window,
                                                          boolean includeSys,
                                                          String topologyId,
                                                          StormTopology topology,
                                                          String componentId)
        aggregate component executor stats.
        Parameters:
        exec2hostPort - a Map of {executor -> host+port}
        task2component - a Map of {task id -> component}
        beats - a converted HashMap of executor heartbeats, {executor -> heartbeat}
        window - specified window
        includeSys - whether to include system streams
        topologyId - topology id
        topology - storm topology
        componentId - component id
        Returns:
        ComponentPageInfo thrift structure
      • aggWorkerStats

        public static List<WorkerSummary> aggWorkerStats​(String stormId,
                                                         String stormName,
                                                         Map<Integer,​String> task2Component,
                                                         Map<List<Integer>,​Map<String,​Object>> beats,
                                                         Map<List<Long>,​List<Object>> exec2NodePort,
                                                         Map<String,​String> nodeHost,
                                                         Map<WorkerSlot,​WorkerResources> worker2Resources,
                                                         boolean includeSys,
                                                         boolean userAuthorized,
                                                         String filterSupervisor,
                                                         String owner)
        aggregate statistics per worker for a topology. Optionally filtering on specific supervisors
        Parameters:
        stormId - topology id
        stormName - storm topology
        task2Component - a Map of {task id -> component}
        beats - a converted HashMap of executor heartbeats, {executor -> heartbeat}
        exec2NodePort - a Map of {executor -> host+port}
        includeSys - whether to include system streams
        userAuthorized - whether the user is authorized to view topology info
        filterSupervisor - if not null, only return WorkerSummaries for that supervisor
        owner - owner of the topology
      • convertZkExecutorHb

        public static Map<String,​Object> convertZkExecutorHb​(ExecutorBeat beat)
        convert thrift ExecutorBeat into a java HashMap.
      • convertExecutorStats

        public static Map<String,​Object> convertExecutorStats​(ExecutorStats stats)
        convert thrift ExecutorStats structure into a java HashMap.
      • extractNodeInfosFromHbForComp

        public static List<Map<String,​Object>> extractNodeInfosFromHbForComp​(Map<List<? extends Number>,​List<Object>> exec2hostPort,
                                                                                   Map<Integer,​String> task2component,
                                                                                   boolean includeSys,
                                                                                   String compId)
        extract a list of host port info for specified component.
        Parameters:
        exec2hostPort - {executor -> host+port}
        task2component - {task id -> component}
        includeSys - whether to include system streams
        compId - component id
        Returns:
        a list of host+port
      • computeBoltCapacity

        public static double computeBoltCapacity​(List<ExecutorSummary> executorSumms)
        computes max bolt capacity.
        Parameters:
        executorSumms - a list of ExecutorSummary
        Returns:
        max bolt capacity
      • computeExecutorCapacity

        public static double computeExecutorCapacity​(ExecutorSummary summary)
        Compute the capacity of a executor. approximation of the % of time spent doing real work.
        Parameters:
        summary - the stats for the executor.
        Returns:
        the capacity of the executor.
      • getFilledStats

        public static List<ExecutorSummary> getFilledStats​(List<ExecutorSummary> summs)
        filter ExecutorSummary whose stats is null.
        Parameters:
        summs - a list of ExecutorSummary
        Returns:
        filtered summs
      • thriftifyExecutorStats

        public static ExecutorStats thriftifyExecutorStats​(Map stats)
        Convert Executor stats to thrift data structure.
        Parameters:
        stats - the stats in the form of a map.
        Returns:
        teh thrift structure for the stats.
      • componentType

        public static String componentType​(StormTopology topology,
                                           String compId)
        Get the coponenet type for a give id.
        Parameters:
        topology - the topology this is a part of.
        compId - the id of the component.
        Returns:
        the type as a String "BOLT" or "SPOUT".
      • floatStr

        public static String floatStr​(Double n)
        Convert a float to a string for display.
        Parameters:
        n - the value to format.
        Returns:
        the string ready for display.
      • errorSubset

        public static String errorSubset​(String errorStr)
      • windowSetConverter

        public static <K> Map windowSetConverter​(Map stats,
                                                 org.apache.storm.stats.ClientStatsUtil.KeyTransformer<K> firstKeyFunc)