Class Utils


  • public class Utils
    extends Object
    • Field Detail

      • LOG

        public static final org.slf4j.Logger LOG
      • BLOB_KEY_PATTERN

        public static final Pattern BLOB_KEY_PATTERN
    • Constructor Detail

      • Utils

        public Utils()
    • Method Detail

      • setInstance

        public static Utils setInstance​(Utils u)
        Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.
        Parameters:
        u - a Utils instance
        Returns:
        the previously set instance
      • setClassLoaderForJavaDeSerialize

        public static void setClassLoaderForJavaDeSerialize​(ClassLoader cl)
      • resetClassLoaderForJavaDeSerialize

        public static void resetClassLoaderForJavaDeSerialize()
      • findResources

        public static List<URL> findResources​(String name)
      • findAndReadConfigFile

        public static Map<String,​Object> findAndReadConfigFile​(String name,
                                                                     boolean mustExist)
      • readDefaultConfig

        public static Map<String,​Object> readDefaultConfig()
      • urlEncodeUtf8

        public static String urlEncodeUtf8​(String s)
        URL encode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLEncoder.encode(String, Charset) instead, which obsoletes this method.
      • urlDecodeUtf8

        public static String urlDecodeUtf8​(String s)
        URL decode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLDecoder.decode(String, Charset) instead, which obsoletes this method.
      • readCommandLineOpts

        public static Map<String,​Object> readCommandLineOpts()
      • bitXorVals

        public static long bitXorVals​(List<Long> coll)
      • bitXor

        public static long bitXor​(Long a,
                                  Long b)
      • addShutdownHookWithForceKillIn1Sec

        public static void addShutdownHookWithForceKillIn1Sec​(Runnable func)
        Adds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for a second and then halts the runtime to avoid any zombie process in case cleanup function hangs.
      • addShutdownHookWithDelayedForceKill

        public static void addShutdownHookWithDelayedForceKill​(Runnable func,
                                                               int numSecs)
        Adds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for numSecs and then halts the runtime to avoid any zombie process in case cleanup function hangs.
      • isSystemId

        public static boolean isSystemId​(String id)
      • asyncLoop

        public static Utils.SmartThread asyncLoop​(Callable afn,
                                                  boolean isDaemon,
                                                  Thread.UncaughtExceptionHandler eh,
                                                  int priority,
                                                  boolean isFactory,
                                                  boolean startImmediately,
                                                  String threadName)
        Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous call.

        The given afn may be a callable that returns the number of seconds to sleep, or it may be a Callable that returns another Callable that in turn returns the number of seconds to sleep. In the latter case isFactory.

        Parameters:
        afn - the code to call on each iteration
        isDaemon - whether the new thread should be a daemon thread
        eh - code to call when afn throws an exception
        priority - the new thread's priority
        isFactory - whether afn returns a callable instead of sleep seconds
        startImmediately - whether to start the thread before returning
        threadName - a suffix to be appended to the thread name
        Returns:
        the newly created thread
        See Also:
        Thread
      • asyncLoop

        public static Utils.SmartThread asyncLoop​(Callable afn,
                                                  String threadName,
                                                  Thread.UncaughtExceptionHandler eh)
        Convenience method used when only the function and name suffix are given.
        Parameters:
        afn - the code to call on each iteration
        threadName - a suffix to be appended to the thread name
        Returns:
        the newly created thread
        See Also:
        Thread
      • asyncLoop

        public static Utils.SmartThread asyncLoop​(Callable afn)
        Convenience method used when only the function is given.
        Parameters:
        afn - the code to call on each iteration
        Returns:
        the newly created thread
      • exceptionCauseIsInstanceOf

        public static boolean exceptionCauseIsInstanceOf​(Class klass,
                                                         Throwable throwable)
        Checks if a throwable is an instance of a particular class.
        Parameters:
        klass - The class you're expecting
        throwable - The throwable you expect to be an instance of klass
        Returns:
        true if throwable is instance of klass, false otherwise.
      • secureRandomLong

        public static long secureRandomLong()
      • hostname

        public static String hostname()
                               throws UnknownHostException
        Gets the storm.local.hostname value, or tries to figure out the local hostname if it is not set in the config.
        Returns:
        a string representation of the hostname.
        Throws:
        UnknownHostException
      • exitProcess

        public static void exitProcess​(int val,
                                       String msg)
      • uuid

        public static String uuid()
      • javaSerialize

        public static byte[] javaSerialize​(Object obj)
      • javaDeserialize

        public static <T> T javaDeserialize​(byte[] serialized,
                                            Class<T> clazz)
      • get

        public static <S,​T> T get​(Map<S,​T> m,
                                        S key,
                                        T def)
      • zeroIfNaNOrInf

        public static double zeroIfNaNOrInf​(double x)
      • parseZkId

        public static org.apache.storm.shade.org.apache.zookeeper.data.Id parseZkId​(String id,
                                                                                    String configName)
      • getSuperUserAcl

        public static org.apache.storm.shade.org.apache.zookeeper.data.ACL getSuperUserAcl​(Map<String,​Object> conf)
        Get the ACL for nimbus/supervisor. The Super User ACL. This assumes that security is enabled.
        Parameters:
        conf - the config to get the super User ACL from
        Returns:
        the super user ACL.
      • getWorkerACL

        public static List<org.apache.storm.shade.org.apache.zookeeper.data.ACL> getWorkerACL​(Map<String,​Object> conf)
        Get the ZK ACLs that a worker should use when writing to ZK.
        Parameters:
        conf - the config for the topology.
        Returns:
        the ACLs
      • isZkAuthenticationConfiguredTopology

        public static boolean isZkAuthenticationConfiguredTopology​(Map<String,​Object> conf)
        Is the topology configured to have ZooKeeper authentication.
        Parameters:
        conf - the topology configuration
        Returns:
        true if ZK is configured else false
      • handleUncaughtException

        public static void handleUncaughtException​(Throwable t,
                                                   Set<Class<?>> allowedExceptions,
                                                   boolean worker)
        Handles uncaught exceptions.
        Parameters:
        worker - true if this is for handling worker exceptions
      • handleUncaughtException

        public static void handleUncaughtException​(Throwable t)
      • handleWorkerUncaughtException

        public static void handleWorkerUncaughtException​(Throwable t)
      • thriftSerialize

        public static byte[] thriftSerialize​(org.apache.storm.thrift.TBase t)
      • thriftDeserialize

        public static <T> T thriftDeserialize​(Class<T> c,
                                              byte[] b)
      • thriftDeserialize

        public static <T> T thriftDeserialize​(Class<T> c,
                                              byte[] b,
                                              int offset,
                                              int length)
      • sleepNoSimulation

        public static void sleepNoSimulation​(long millis)
      • sleep

        public static void sleep​(long millis)
      • reverseMap

        public static <K,​V> HashMap<V,​List<K>> reverseMap​(Map<K,​V> map)
        "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}".

        Example usage in java: Map<Integer, String> tasks; Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);

        The order of he resulting list values depends on the ordering properties of the Map passed in. The caller is responsible for passing an ordered map if they expect the result to be consistently ordered as well.

        Parameters:
        map - to reverse
        Returns:
        a reversed map
      • reverseMap

        public static Map<Object,​List<Object>> reverseMap​(List<List<Object>> listSeq)
        "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}" Reverses an assoc-list style Map like reverseMap(Map...)
        Parameters:
        listSeq - to reverse
        Returns:
        a reversed map
      • isOnWindows

        public static boolean isOnWindows()
      • checkFileExists

        public static boolean checkFileExists​(String path)
      • forceDelete

        public static void forceDelete​(String path)
                                throws IOException
        Deletes a file or directory and its contents if it exists. Does not complain if the input is null or does not exist.
        Parameters:
        path - the path to the file or directory
        Throws:
        IOException
      • serialize

        public static byte[] serialize​(Object obj)
      • deserialize

        public static <T> T deserialize​(byte[] serialized,
                                        Class<T> clazz)
      • serializeToString

        public static String serializeToString​(Object obj)
        Serialize an object using the configured serialization and then base64 encode it into a string.
        Parameters:
        obj - the object to encode
        Returns:
        a string with the encoded object in it.
      • deserializeFromString

        public static <T> T deserializeFromString​(String str,
                                                  Class<T> clazz)
        Deserialize an object stored in a string. The String is assumed to be a base64 encoded string containing the bytes to actually deserialize.
        Parameters:
        str - the encoded string.
        clazz - the thrift class we are expecting.
        Returns:
        the decoded object
      • toByteArray

        public static byte[] toByteArray​(ByteBuffer buffer)
      • mkSuicideFn

        public static Runnable mkSuicideFn()
      • readAndLogStream

        public static void readAndLogStream​(String prefix,
                                            InputStream in)
      • gzip

        public static byte[] gzip​(byte[] data)
      • gunzip

        public static byte[] gunzip​(byte[] data)
      • toPositive

        public static int toPositive​(int number)
        A cheap way to deterministically convert a number to a positive value. When the input is positive, the original value is returned. When the input number is negative, the returned positive value is the original value bit AND against Integer.MAX_VALUE(0x7fffffff) which is not its absolutely value.
        Parameters:
        number - a given number
        Returns:
        a positive number.
      • processPid

        public static String processPid()
        Get process PID.
        Returns:
        the pid of this JVM, because Java doesn't provide a real way to do this.
      • fromCompressedJsonConf

        public static Map<String,​Object> fromCompressedJsonConf​(byte[] serialized)
      • redactValue

        public static Map<String,​Object> redactValue​(Map<String,​Object> m,
                                                           String key)
        Creates a new map with a string value in the map replaced with an equivalently-lengthed string of '#'. (If the object is not a string to string will be called on it and replaced)
        Parameters:
        m - The map that a value will be redacted from
        key - The key pointing to the value to be redacted
        Returns:
        a new map with the value redacted. The original map will not be modified.
      • setupDefaultUncaughtExceptionHandler

        public static void setupDefaultUncaughtExceptionHandler()
      • setupWorkerUncaughtExceptionHandler

        public static void setupWorkerUncaughtExceptionHandler()
      • parseJvmHeapMemByChildOpts

        public static Double parseJvmHeapMemByChildOpts​(List<String> options,
                                                        Double defaultValue)
        parses the arguments to extract jvm heap memory size in MB.
        Returns:
        the value of the JVM heap memory setting (in MB) in a java command.
      • isValidConf

        public static boolean isValidConf​(Map<String,​Object> topoConfIn)
      • threadDump

        public static String threadDump()
        Gets some information, including stack trace, for a running thread.
        Returns:
        A human-readable string of the dump.
      • checkDirExists

        public static boolean checkDirExists​(String dir)
      • getConfiguredClass

        public static Object getConfiguredClass​(Map<String,​Object> conf,
                                                Object configKey)
        Return a new instance of a pluggable specified in the conf.
        Parameters:
        conf - The conf to read from.
        configKey - The key pointing to the pluggable class
        Returns:
        an instance of the class or null if it is not specified.
      • isZkAuthenticationConfiguredStormServer

        public static boolean isZkAuthenticationConfiguredStormServer​(Map<String,​Object> conf)
        Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.
        Parameters:
        conf - the storm configuration, not the topology configuration
        Returns:
        true if it is configured else false.
      • toCompressedJsonConf

        public static byte[] toCompressedJsonConf​(Map<String,​Object> topoConf)
      • nullToZero

        public static double nullToZero​(Double v)
      • OR

        public static <V> V OR​(V a,
                               V b)
        a or b the first one that is not null.
        Parameters:
        a - something
        b - something else
        Returns:
        a or b the first one that is not null
      • partitionFixed

        public static <T> List<List<T>> partitionFixed​(int maxNumChunks,
                                                       Collection<T> coll)
        Fills up chunks out of a collection (given a maximum amount of chunks).

        i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]] partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]] partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]

        Parameters:
        maxNumChunks - the maximum number of chunks to return
        coll - the collection to be chunked up
        Returns:
        a list of the chunks, which are themselves lists.
      • readYamlFile

        public static Object readYamlFile​(String yamlFile)
      • getAvailablePort

        public static int getAvailablePort​(int preferredPort)
        Gets an available port. Consider if it is possible to pass port 0 to the server instead of using this method, since there is no guarantee that the port returned by this method will remain free.
        Returns:
        The preferred port if available, or a random available port
      • getAvailablePort

        public static int getAvailablePort()
        Shortcut to calling getAvailablePort(int) with 0 as the preferred port.
        Returns:
        A random available port
      • findOne

        public static <T> T findOne​(IPredicate<T> pred,
                                    Collection<T> coll)
        Find the first item of coll for which pred.test(...) returns true.
        Parameters:
        pred - The IPredicate to test for
        coll - The Collection of items to search through.
        Returns:
        The first matching value in coll, or null if nothing matches.
      • findOne

        public static <T,​U> T findOne​(IPredicate<T> pred,
                                            Map<U,​T> map)
      • addVersions

        public static StormTopology addVersions​(StormTopology topology)
        Add version information to the given topology.
        Parameters:
        topology - the topology being submitted (MIGHT BE MODIFIED)
        Returns:
        topology
      • getConfiguredClasspathVersions

        public static NavigableMap<SimpleVersion,​List<String>> getConfiguredClasspathVersions​(Map<String,​Object> conf,
                                                                                                    List<String> currentClassPath)
        Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP
        Parameters:
        conf - what to read it out of
        currentClassPath - the current classpath for this version of storm (not included in the conf, but returned by this)
        Returns:
        the map
      • getAlternativeVersionsMap

        public static NavigableMap<String,​IVersionInfo> getAlternativeVersionsMap​(Map<String,​Object> conf)
        Get a mapping of the configured supported versions of storm to their actual versions.
        Parameters:
        conf - what to read the configuration out of.
        Returns:
        the map.
      • getConfiguredWorkerMainVersions

        public static NavigableMap<SimpleVersion,​String> getConfiguredWorkerMainVersions​(Map<String,​Object> conf)
        Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP
        Parameters:
        conf - what to read it out of
        Returns:
        the map
      • getConfiguredWorkerLogWriterVersions

        public static NavigableMap<SimpleVersion,​String> getConfiguredWorkerLogWriterVersions​(Map<String,​Object> conf)
        Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP
        Parameters:
        conf - what to read it out of
        Returns:
        the map
      • isLocalhostAddress

        public static boolean isLocalhostAddress​(String address)
      • merge

        public static <K,​V> Map<K,​V> merge​(Map<? extends K,​? extends V> first,
                                                       Map<? extends K,​? extends V> other)
      • convertToArray

        public static <V> ArrayList<V> convertToArray​(Map<Integer,​V> srcMap,
                                                      int start)
      • isValidKey

        public static boolean isValidKey​(String key)
        Validates blob key.
        Parameters:
        key - Key for the blob.
      • findComponentCycles

        public static List<List<String>> findComponentCycles​(StormTopology topology,
                                                             String topoId)
        Find and return components cycles in the topology graph when starting from spout. Return a list of cycles. Each cycle may consist of one or more components. Components that cannot be reached from any of the spouts are ignored.
        Returns:
        a List of cycles. Each cycle has a list of component names.
      • validateCycleFree

        public static void validateCycleFree​(StormTopology topology,
                                             String name)
                                      throws InvalidTopologyException
        Validate that the topology is cycle free. If not, then throw an InvalidTopologyException describing the cycle(s).
        Parameters:
        topology - StormTopology instance to examine.
        name - Name of the topology, used in exception error message.
        Throws:
        InvalidTopologyException - if there are cycles, with message describing the cycles encountered.