Class StatsUtil

java.lang.Object
org.apache.storm.stats.StatsUtil

public class StatsUtil extends Object
  • Field Details

  • Constructor Details

    • StatsUtil

      public StatsUtil()
  • Method Details

    • aggBoltLatAndCount

      public static Map<String,Number> aggBoltLatAndCount(Map<List<String>,Double> id2execAvg, Map<List<String>,Double> id2procAvg, Map<List<String>,Long> id2numExec)
      Aggregates number executed, process latency, and execute latency across all streams.
      Parameters:
      id2execAvg - { global stream id -> exec avg value }, e.g., {["split" "default"] 0.44313}
      id2procAvg - { global stream id -> proc avg value }
      id2numExec - { global stream id -> executed }
    • aggSpoutLatAndCount

      public static Map<String,Number> aggSpoutLatAndCount(Map<String,Double> id2compAvg, Map<String,Long> id2numAcked)
      aggregate number acked and complete latencies across all streams.
    • aggBoltStreamsLatAndCount

      public static <K> Map<K,Map> aggBoltStreamsLatAndCount(Map<K,Double> id2execAvg, Map<K,Double> id2procAvg, Map<K,Long> id2numExec)
      aggregate number executed and process & execute latencies.
    • aggSpoutStreamsLatAndCount

      public static <K> Map<K,Map> aggSpoutStreamsLatAndCount(Map<K,Double> id2compAvg, Map<K,Long> id2acked)
      Aggregates number acked and complete latencies.
    • aggPreMergeCompPageBolt

      public static Map<String,Object> aggPreMergeCompPageBolt(Map<String,Object> beat, String window, boolean includeSys)
      pre-merge component page bolt stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary
      Parameters:
      beat - executor heartbeat data
      window - specified window
      includeSys - whether to include system streams
      Returns:
      per-merged stats
    • aggPreMergeCompPageSpout

      public static Map<String,Object> aggPreMergeCompPageSpout(Map<String,Object> beat, String window, boolean includeSys)
      pre-merge component page spout stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary
      Parameters:
      beat - executor heartbeat data
      window - specified window
      includeSys - whether to include system streams
      Returns:
      per-merged stats
    • aggPreMergeTopoPageBolt

      public static <K, V extends Number> Map<String,Object> aggPreMergeTopoPageBolt(Map<String,Object> beat, String window, boolean includeSys)
      pre-merge component stats of specified bolt id.
      Parameters:
      beat - executor heartbeat data
      window - specified window
      includeSys - whether to include system streams
      Returns:
      { comp id -> comp-stats }
    • aggPreMergeTopoPageSpout

      public static <K, V extends Number> Map<String,Object> aggPreMergeTopoPageSpout(Map<String,Object> m, String window, boolean includeSys)
      pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.
    • mergeAggCompStatsCompPageBolt

      public static Map<String,Object> mergeAggCompStatsCompPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
      merge accumulated bolt stats with pre-merged component stats.
      Parameters:
      accBoltStats - accumulated bolt stats
      boltStats - pre-merged component stats
      Returns:
      merged stats
    • mergeAggCompStatsCompPageSpout

      public static Map<String,Object> mergeAggCompStatsCompPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
      merge accumulated bolt stats with pre-merged component stats.
    • mergeAggCompStatsTopoPageBolt

      public static Map<String,Object> mergeAggCompStatsTopoPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
      merge accumulated bolt stats with new bolt stats.
      Parameters:
      accBoltStats - accumulated bolt stats
      boltStats - new input bolt stats
      Returns:
      merged bolt stats
    • mergeAggCompStatsTopoPageSpout

      public static Map<String,Object> mergeAggCompStatsTopoPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
      merge accumulated bolt stats with new bolt stats.
    • aggTopoExecStats

      public static Map<String,Object> aggTopoExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
      A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.
    • aggTopoExecsStats

      public static TopologyPageInfo aggTopoExecsStats(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState)
      aggregate topo executors stats.
      Parameters:
      topologyId - topology id
      exec2nodePort - executor -> host+port
      task2component - task -> component
      beats - executor[start, end] -> executor heartbeat
      topology - storm topology
      window - the window to be aggregated
      includeSys - whether to include system streams
      clusterState - cluster state
      Returns:
      TopologyPageInfo thrift structure
    • aggregateBoltStats

      public static <T> Map<String,Map> aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys)
      aggregate bolt stats.
      Parameters:
      statsSeq - a seq of ExecutorStats
      includeSys - whether to include system streams
      Returns:
      aggregated bolt stats: {metric -> win -> global stream id -> value}
    • aggregateSpoutStats

      public static Map<String,Map> aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys)
      aggregate spout stats.
      Parameters:
      statsSeq - a seq of ExecutorStats
      includeSys - whether to include system streams
      Returns:
      aggregated spout stats: {metric -> win -> global stream id -> value}
    • aggregateCommonStats

      public static <T> Map<String,Map<String,Map<T,Long>>> aggregateCommonStats(List<ExecutorSummary> statsSeq)
      aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.
    • preProcessStreamSummary

      public static <T> Map<String,Map<String,Map<T,Long>>> preProcessStreamSummary(Map<String,Map<String,Map<T,Long>>> streamSummary, boolean includeSys)
      filter system streams of aggregated spout/bolt stats if necessary.
    • aggregateCountStreams

      public static <K, V extends Number> Map<String,Long> aggregateCountStreams(Map<String,Map<K,V>> stats)
      aggregate count streams by window.
      Parameters:
      stats - a Map of value: {win -> stream -> value}
      Returns:
      a Map of value: {win -> value}
    • aggregateAverages

      public static <K> Map<String,Map<K,Double>> aggregateAverages(List<Map<String,Map<K,Double>>> avgSeq, List<Map<String,Map<K,Long>>> countSeq)
      compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.
      Parameters:
      avgSeq - a list of {win -> global stream id -> avg value}
      countSeq - a list of {win -> global stream id -> count value}
      Returns:
      a Map of {win -> global stream id -> weighted avg value}
    • aggregateAvgStreams

      public static <K> Map<String,Double> aggregateAvgStreams(Map<String,Map<K,Double>> avgs, Map<String,Map<K,Long>> counts)
      aggregate weighted average of all streams.
      Parameters:
      avgs - a Map of {win -> stream -> average value}
      counts - a Map of {win -> stream -> count value}
      Returns:
      a Map of {win -> aggregated value}
    • spoutStreamsStats

      public static Map<String,Map> spoutStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
      aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.
    • boltStreamsStats

      public static Map<String,Map> boltStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
      aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.
    • aggregateSpoutStreams

      public static Map<String,Map> aggregateSpoutStreams(Map<String,Map> stats)
      aggregate all spout streams.
      Parameters:
      stats - a Map of {metric -> win -> stream id -> value}
      Returns:
      a Map of {metric -> win -> aggregated value}
    • aggregateBoltStreams

      public static Map<String,Map> aggregateBoltStreams(Map<String,Map> stats)
      aggregate all bolt streams.
      Parameters:
      stats - a Map of {metric -> win -> stream id -> value}
      Returns:
      a Map of {metric -> win -> aggregated value}
    • aggBoltExecWinStats

      public static Map<String,Object> aggBoltExecWinStats(Map<String,Object> accStats, Map<String,Object> newStats, boolean includeSys)
      aggregate windowed stats from a bolt executor stats with a Map of accumulated stats.
    • aggSpoutExecWinStats

      public static Map<String,Object> aggSpoutExecWinStats(Map<String,Object> accStats, Map<String,Object> beat, boolean includeSys)
      aggregate windowed stats from a spout executor stats with a Map of accumulated stats.
    • aggregateCounts

      public static <T> Map<String,Map<T,Long>> aggregateCounts(List<Map<String,Map<T,Long>>> countsSeq)
      aggregate a list of count maps into one map.
      Parameters:
      countsSeq - a seq of {win -> GlobalStreamId -> value}
    • aggregateCompStats

      public static Map<String,Object> aggregateCompStats(String window, boolean includeSys, List<Map<String,Object>> beats, String compType)
      Aggregate the stats for a component over a given window of time.
    • aggCompExecStats

      public static Map<String,Object> aggCompExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
      Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified.
    • postAggregateCompStats

      public static Map<String,Object> postAggregateCompStats(Map<String,Object> compStats)
      post aggregate component stats: 1. computes execute-latency/process-latency from execute/process latency total 2. computes windowed weight avgs 3. transform Map keys
      Parameters:
      compStats - accumulated comp stats
    • aggCompExecsStats

      public static ComponentPageInfo aggCompExecsStats(Map exec2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId)
      aggregate component executor stats.
      Parameters:
      exec2hostPort - a Map of {executor -> host+port}
      task2component - a Map of {task id -> component}
      beats - a converted HashMap of executor heartbeats, {executor -> heartbeat}
      window - specified window
      includeSys - whether to include system streams
      topologyId - topology id
      topology - storm topology
      componentId - component id
      Returns:
      ComponentPageInfo thrift structure
    • aggWorkerStats

      public static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, Map<Integer,String> task2Component, Map<List<Integer>,Map<String,Object>> beats, Map<List<Long>,List<Object>> exec2NodePort, Map<String,String> nodeHost, Map<WorkerSlot,WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor, String owner)
      aggregate statistics per worker for a topology. Optionally filtering on specific supervisors
      Parameters:
      stormId - topology id
      stormName - storm topology
      task2Component - a Map of {task id -> component}
      beats - a converted HashMap of executor heartbeats, {executor -> heartbeat}
      exec2NodePort - a Map of {executor -> host+port}
      includeSys - whether to include system streams
      userAuthorized - whether the user is authorized to view topology info
      filterSupervisor - if not null, only return WorkerSummaries for that supervisor
      owner - owner of the topology
    • convertExecutorBeats

      public static Map<List<Integer>,Map<String,Object>> convertExecutorBeats(Map<ExecutorInfo,ExecutorBeat> beats)
      convert thrift executor heartbeats into a java HashMap.
    • convertWorkerBeats

      public static Map<List<Integer>,Map<String,Object>> convertWorkerBeats(SupervisorWorkerHeartbeat workerHeartbeat)
      convert SupervisorWorkerHeartbeat to nimbus local report executor heartbeats.
    • convertZkExecutorHb

      public static Map<String,Object> convertZkExecutorHb(ExecutorBeat beat)
      convert thrift ExecutorBeat into a java HashMap.
    • convertZkWorkerHb

      public static Map<String,Object> convertZkWorkerHb(ClusterWorkerHeartbeat workerHb)
      convert a thrift worker heartbeat into a java HashMap.
    • convertExecutorsStats

      public static Map<List<Integer>,ExecutorStats> convertExecutorsStats(Map<ExecutorInfo,ExecutorStats> stats)
      convert executors stats into a HashMap, note that ExecutorStats are remained unchanged.
    • convertExecutorStats

      public static Map<String,Object> convertExecutorStats(ExecutorStats stats)
      convert thrift ExecutorStats structure into a java HashMap.
    • extractNodeInfosFromHbForComp

      public static List<Map<String,Object>> extractNodeInfosFromHbForComp(Map<List<? extends Number>,List<Object>> exec2hostPort, Map<Integer,String> task2component, boolean includeSys, String compId)
      extract a list of host port info for specified component.
      Parameters:
      exec2hostPort - {executor -> host+port}
      task2component - {task id -> component}
      includeSys - whether to include system streams
      compId - component id
      Returns:
      a list of host+port
    • extractDataFromHb

      public static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology)
      extracts a list of executor data from heart beats.
    • extractDataFromHb

      public static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology, String compId)
      extracts a list of executor data from heart beats.
    • computeBoltCapacity

      public static double computeBoltCapacity(List<ExecutorSummary> executorSumms)
      computes max bolt capacity.
      Parameters:
      executorSumms - a list of ExecutorSummary
      Returns:
      max bolt capacity
    • computeExecutorCapacity

      public static double computeExecutorCapacity(ExecutorSummary summary)
      Compute the capacity of a executor. approximation of the % of time spent doing real work.
      Parameters:
      summary - the stats for the executor.
      Returns:
      the capacity of the executor.
    • getFilledStats

      public static List<ExecutorSummary> getFilledStats(List<ExecutorSummary> summs)
      filter ExecutorSummary whose stats is null.
      Parameters:
      summs - a list of ExecutorSummary
      Returns:
      filtered summs
    • thriftifyRpcWorkerHb

      public static SupervisorWorkerHeartbeat thriftifyRpcWorkerHb(String stormId, List<Long> executorId)
      Used for local test.
    • thriftifyExecutorStats

      public static ExecutorStats thriftifyExecutorStats(Map stats)
      Convert Executor stats to thrift data structure.
      Parameters:
      stats - the stats in the form of a map.
      Returns:
      teh thrift structure for the stats.
    • componentType

      public static String componentType(StormTopology topology, String compId)
      Get the coponenet type for a give id.
      Parameters:
      topology - the topology this is a part of.
      compId - the id of the component.
      Returns:
      the type as a String "BOLT" or "SPOUT".
    • floatStr

      public static String floatStr(Double n)
      Convert a float to a string for display.
      Parameters:
      n - the value to format.
      Returns:
      the string ready for display.
    • errorSubset

      public static String errorSubset(String errorStr)
    • windowSetConverter

      public static <K> Map windowSetConverter(Map stats, org.apache.storm.stats.ClientStatsUtil.KeyTransformer<K> firstKeyFunc)