Package org.apache.storm.utils
Class Utils
- java.lang.Object
-
- org.apache.storm.utils.Utils
-
public class Utils extends Object
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
Utils.SmartThread
A thread that can answer if it is sleeping in the case of simulated time.static class
Utils.UptimeComputer
-
Field Summary
Fields Modifier and Type Field Description static Pattern
BLOB_KEY_PATTERN
static String
DEFAULT_STREAM_ID
static org.slf4j.Logger
LOG
-
Constructor Summary
Constructors Constructor Description Utils()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static void
addShutdownHookWithDelayedForceKill(Runnable func, int numSecs)
Adds the user supplied function as a shutdown hook for cleanup.static void
addShutdownHookWithForceKillIn1Sec(Runnable func)
Adds the user supplied function as a shutdown hook for cleanup.static StormTopology
addVersions(StormTopology topology)
Add version information to the given topology.static Utils.SmartThread
asyncLoop(Callable afn)
Convenience method used when only the function is given.static Utils.SmartThread
asyncLoop(Callable afn, boolean isDaemon, Thread.UncaughtExceptionHandler eh, int priority, boolean isFactory, boolean startImmediately, String threadName)
Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous call.static Utils.SmartThread
asyncLoop(Callable afn, String threadName, Thread.UncaughtExceptionHandler eh)
Convenience method used when only the function and name suffix are given.static long
bitXor(Long a, Long b)
static long
bitXorVals(List<Long> coll)
static boolean
checkDirExists(String dir)
static boolean
checkFileExists(String path)
static <V> ArrayList<V>
convertToArray(Map<Integer,V> srcMap, int start)
static Thread.UncaughtExceptionHandler
createDefaultUncaughtExceptionHandler()
static Thread.UncaughtExceptionHandler
createWorkerUncaughtExceptionHandler()
static <T> T
deserialize(byte[] serialized, Class<T> clazz)
static <T> T
deserializeFromString(String str, Class<T> clazz)
Deserialize an object stored in a string.static boolean
exceptionCauseIsInstanceOf(Class klass, Throwable throwable)
Checks if a throwable is an instance of a particular class.static void
exitProcess(int val, String msg)
static Map<String,Object>
findAndReadConfigFile(String name)
static Map<String,Object>
findAndReadConfigFile(String name, boolean mustExist)
static List<List<String>>
findComponentCycles(StormTopology topology, String topoId)
Find and return components cycles in the topology graph when starting from spout.static <T> T
findOne(IPredicate<T> pred, Collection<T> coll)
Find the first item of coll for which pred.test(...) returns true.static <T,U>
TfindOne(IPredicate<T> pred, Map<U,T> map)
static List<URL>
findResources(String name)
static void
forceDelete(String path)
Deletes a file or directory and its contents if it exists.protected void
forceDeleteImpl(String path)
static Map<String,Object>
fromCompressedJsonConf(byte[] serialized)
static <S,T>
Tget(Map<S,T> m, S key, T def)
static NavigableMap<String,IVersionInfo>
getAlternativeVersionsMap(Map<String,Object> conf)
Get a mapping of the configured supported versions of storm to their actual versions.static int
getAvailablePort()
Shortcut to callinggetAvailablePort(int)
with 0 as the preferred port.static int
getAvailablePort(int preferredPort)
Gets an available port.static ClientBlobStore
getClientBlobStore(Map<String,Object> conf)
static <T> T
getCompatibleVersion(NavigableMap<SimpleVersion,T> versionedMap, SimpleVersion desiredVersion, String what, T defaultValue)
static ComponentCommon
getComponentCommon(StormTopology topology, String id)
static Map<String,Object>
getConfigFromClasspath(List<String> cp, Map<String,Object> conf)
static Object
getConfiguredClass(Map<String,Object> conf, Object configKey)
Return a new instance of a pluggable specified in the conf.static NavigableMap<SimpleVersion,List<String>>
getConfiguredClasspathVersions(Map<String,Object> conf, List<String> currentClassPath)
Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAPstatic NavigableMap<SimpleVersion,String>
getConfiguredWorkerLogWriterVersions(Map<String,Object> conf)
Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAPstatic NavigableMap<SimpleVersion,String>
getConfiguredWorkerMainVersions(Map<String,Object> conf)
Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAPstatic GlobalStreamId
getGlobalStreamId(String componentId, String streamId)
static List<String>
getRepeat(List<String> list)
static Object
getSetComponentObject(ComponentObject obj)
static org.apache.storm.shade.org.apache.zookeeper.data.ACL
getSuperUserAcl(Map<String,Object> conf)
Get the ACL for nimbus/supervisor.static String
getTopologyId(String name, Nimbus.Iface client)
static TopologyInfo
getTopologyInfo(String name, String asUser, Map<String,Object> topoConf)
static List<org.apache.storm.shade.org.apache.zookeeper.data.ACL>
getWorkerACL(Map<String,Object> conf)
Get the ZK ACLs that a worker should use when writing to ZK.static byte[]
gunzip(byte[] data)
static byte[]
gzip(byte[] data)
static void
handleUncaughtException(Throwable t)
static void
handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker)
Handles uncaught exceptions.static void
handleWorkerUncaughtException(Throwable t)
static String
hostname()
Gets the storm.local.hostname value, or tries to figure out the local hostname if it is not set in the config.protected String
hostnameImpl()
static TreeMap<Integer,Integer>
integerDivided(int sum, int numPieces)
static boolean
isLocalhostAddress(String address)
static boolean
isOnWindows()
static boolean
isSystemId(String id)
static boolean
isValidConf(Map<String,Object> topoConfIn)
static boolean
isValidKey(String key)
Validates blob key.static boolean
isZkAuthenticationConfiguredStormServer(Map<String,Object> conf)
Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.static boolean
isZkAuthenticationConfiguredTopology(Map<String,Object> conf)
Is the topology configured to have ZooKeeper authentication.static <T> T
javaDeserialize(byte[] serialized, Class<T> clazz)
static byte[]
javaSerialize(Object obj)
static <T> String
join(Iterable<T> coll, String sep)
static String
localHostname()
protected String
localHostnameImpl()
static Utils.UptimeComputer
makeUptimeComputer()
Utils.UptimeComputer
makeUptimeComputerImpl()
static String
memoizedLocalHostname()
static <K,V>
Map<K,V>merge(Map<? extends K,? extends V> first, Map<? extends K,? extends V> other)
static Runnable
mkSuicideFn()
static double
nullToZero(Double v)
static <V> V
OR(V a, V b)
a or b the first one that is not null.static Map<String,Object>
parseJson(String json)
static Double
parseJvmHeapMemByChildOpts(List<String> options, Double defaultValue)
parses the arguments to extract jvm heap memory size in MB.static org.apache.storm.shade.org.apache.zookeeper.data.Id
parseZkId(String id, String configName)
static <T> List<List<T>>
partitionFixed(int maxNumChunks, Collection<T> coll)
Fills up chunks out of a collection (given a maximum amount of chunks).static String
processPid()
Get process PID.static void
readAndLogStream(String prefix, InputStream in)
static Map<String,Object>
readCommandLineOpts()
static Map<String,Object>
readDefaultConfig()
static Map<String,Object>
readStormConfig()
static Object
readYamlFile(String yamlFile)
static Map<String,Object>
redactValue(Map<String,Object> m, String key)
Creates a new map with a string value in the map replaced with an equivalently-lengthed string of '#'.static void
resetClassLoaderForJavaDeSerialize()
static Map<Object,List<Object>>
reverseMap(List<List<Object>> listSeq)
"[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}" Reverses an assoc-list style Map like reverseMap(Map...)static <K,V>
HashMap<V,List<K>>reverseMap(Map<K,V> map)
"{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
.static long
secureRandomLong()
static byte[]
serialize(Object obj)
static String
serializeToString(Object obj)
Serialize an object using the configured serialization and then base64 encode it into a string.static void
setClassLoaderForJavaDeSerialize(ClassLoader cl)
static Utils
setInstance(Utils u)
Provide an instance of this class for delegates to use.static void
setupDefaultUncaughtExceptionHandler()
static void
setupWorkerUncaughtExceptionHandler()
static void
sleep(long millis)
static void
sleepNoSimulation(long millis)
static String
threadDump()
Gets some information, including stack trace, for a running thread.static <T> T
thriftDeserialize(Class<T> c, byte[] b)
static <T> T
thriftDeserialize(Class<T> c, byte[] b, int offset, int length)
static byte[]
thriftSerialize(org.apache.storm.thrift.TBase t)
static byte[]
toByteArray(ByteBuffer buffer)
static byte[]
toCompressedJsonConf(Map<String,Object> topoConf)
static int
toPositive(int number)
A cheap way to deterministically convert a number to a positive value.static List<Object>
tuple(Object... values)
static <T extends Throwable>
voidunwrapAndThrow(Class<T> klass, Throwable t)
static <T extends Throwable>
TunwrapTo(Class<T> klass, Throwable t)
static String
urlDecodeUtf8(String s)
URL decode the given string using the UTF-8 charset.static String
urlEncodeUtf8(String s)
URL encode the given string using the UTF-8 charset.static String
uuid()
static void
validateCycleFree(StormTopology topology, String name)
Validate that the topology is cycle free.static void
validateTopologyBlobStoreMap(Map<String,Object> topoConf)
Validate topology blobstore map.static void
validateTopologyBlobStoreMap(Map<String,Object> topoConf, BlobStore blobStore)
Validate topology blobstore map.static void
validateTopologyBlobStoreMap(Map<String,Object> topoConf, NimbusBlobStore client)
Validate topology blobstore map.static void
validateTopologyName(String name)
Validates topology name.static RuntimeException
wrapInRuntime(Exception e)
static double
zeroIfNaNOrInf(double x)
-
-
-
Field Detail
-
LOG
public static final org.slf4j.Logger LOG
-
DEFAULT_STREAM_ID
public static final String DEFAULT_STREAM_ID
- See Also:
- Constant Field Values
-
BLOB_KEY_PATTERN
public static final Pattern BLOB_KEY_PATTERN
-
-
Method Detail
-
setInstance
public static Utils setInstance(Utils u)
Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.- Parameters:
u
- a Utils instance- Returns:
- the previously set instance
-
setClassLoaderForJavaDeSerialize
public static void setClassLoaderForJavaDeSerialize(ClassLoader cl)
-
resetClassLoaderForJavaDeSerialize
public static void resetClassLoaderForJavaDeSerialize()
-
findAndReadConfigFile
public static Map<String,Object> findAndReadConfigFile(String name, boolean mustExist)
-
urlEncodeUtf8
public static String urlEncodeUtf8(String s)
URL encode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLEncoder.encode(String, Charset) instead, which obsoletes this method.
-
urlDecodeUtf8
public static String urlDecodeUtf8(String s)
URL decode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLDecoder.decode(String, Charset) instead, which obsoletes this method.
-
addShutdownHookWithForceKillIn1Sec
public static void addShutdownHookWithForceKillIn1Sec(Runnable func)
Adds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for a second and then halts the runtime to avoid any zombie process in case cleanup function hangs.
-
addShutdownHookWithDelayedForceKill
public static void addShutdownHookWithDelayedForceKill(Runnable func, int numSecs)
Adds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for numSecs and then halts the runtime to avoid any zombie process in case cleanup function hangs.
-
isSystemId
public static boolean isSystemId(String id)
-
asyncLoop
public static Utils.SmartThread asyncLoop(Callable afn, boolean isDaemon, Thread.UncaughtExceptionHandler eh, int priority, boolean isFactory, boolean startImmediately, String threadName)
Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous call.The given afn may be a callable that returns the number of seconds to sleep, or it may be a Callable that returns another Callable that in turn returns the number of seconds to sleep. In the latter case isFactory.
- Parameters:
afn
- the code to call on each iterationisDaemon
- whether the new thread should be a daemon threadeh
- code to call when afn throws an exceptionpriority
- the new thread's priorityisFactory
- whether afn returns a callable instead of sleep secondsstartImmediately
- whether to start the thread before returningthreadName
- a suffix to be appended to the thread name- Returns:
- the newly created thread
- See Also:
Thread
-
asyncLoop
public static Utils.SmartThread asyncLoop(Callable afn, String threadName, Thread.UncaughtExceptionHandler eh)
Convenience method used when only the function and name suffix are given.- Parameters:
afn
- the code to call on each iterationthreadName
- a suffix to be appended to the thread name- Returns:
- the newly created thread
- See Also:
Thread
-
asyncLoop
public static Utils.SmartThread asyncLoop(Callable afn)
Convenience method used when only the function is given.- Parameters:
afn
- the code to call on each iteration- Returns:
- the newly created thread
-
exceptionCauseIsInstanceOf
public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable)
Checks if a throwable is an instance of a particular class.- Parameters:
klass
- The class you're expectingthrowable
- The throwable you expect to be an instance of klass- Returns:
- true if throwable is instance of klass, false otherwise.
-
unwrapAndThrow
public static <T extends Throwable> void unwrapAndThrow(Class<T> klass, Throwable t) throws T extends Throwable
- Throws:
T extends Throwable
-
wrapInRuntime
public static RuntimeException wrapInRuntime(Exception e)
-
secureRandomLong
public static long secureRandomLong()
-
hostname
public static String hostname() throws UnknownHostException
Gets the storm.local.hostname value, or tries to figure out the local hostname if it is not set in the config.- Returns:
- a string representation of the hostname.
- Throws:
UnknownHostException
-
localHostname
public static String localHostname() throws UnknownHostException
- Throws:
UnknownHostException
-
exitProcess
public static void exitProcess(int val, String msg)
-
uuid
public static String uuid()
-
javaSerialize
public static byte[] javaSerialize(Object obj)
-
javaDeserialize
public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz)
-
get
public static <S,T> T get(Map<S,T> m, S key, T def)
-
zeroIfNaNOrInf
public static double zeroIfNaNOrInf(double x)
-
parseZkId
public static org.apache.storm.shade.org.apache.zookeeper.data.Id parseZkId(String id, String configName)
-
getSuperUserAcl
public static org.apache.storm.shade.org.apache.zookeeper.data.ACL getSuperUserAcl(Map<String,Object> conf)
Get the ACL for nimbus/supervisor. The Super User ACL. This assumes that security is enabled.- Parameters:
conf
- the config to get the super User ACL from- Returns:
- the super user ACL.
-
getWorkerACL
public static List<org.apache.storm.shade.org.apache.zookeeper.data.ACL> getWorkerACL(Map<String,Object> conf)
Get the ZK ACLs that a worker should use when writing to ZK.- Parameters:
conf
- the config for the topology.- Returns:
- the ACLs
-
isZkAuthenticationConfiguredTopology
public static boolean isZkAuthenticationConfiguredTopology(Map<String,Object> conf)
Is the topology configured to have ZooKeeper authentication.- Parameters:
conf
- the topology configuration- Returns:
- true if ZK is configured else false
-
handleUncaughtException
public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker)
Handles uncaught exceptions.- Parameters:
worker
- true if this is for handling worker exceptions
-
handleUncaughtException
public static void handleUncaughtException(Throwable t)
-
handleWorkerUncaughtException
public static void handleWorkerUncaughtException(Throwable t)
-
thriftSerialize
public static byte[] thriftSerialize(org.apache.storm.thrift.TBase t)
-
thriftDeserialize
public static <T> T thriftDeserialize(Class<T> c, byte[] b)
-
thriftDeserialize
public static <T> T thriftDeserialize(Class<T> c, byte[] b, int offset, int length)
-
sleepNoSimulation
public static void sleepNoSimulation(long millis)
-
sleep
public static void sleep(long millis)
-
makeUptimeComputer
public static Utils.UptimeComputer makeUptimeComputer()
-
reverseMap
public static <K,V> HashMap<V,List<K>> reverseMap(Map<K,V> map)
"{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
.Example usage in java:
Map<Integer, String> tasks; Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);
The order of he resulting list values depends on the ordering properties of the Map passed in. The caller is responsible for passing an ordered map if they expect the result to be consistently ordered as well.
- Parameters:
map
- to reverse- Returns:
- a reversed map
-
reverseMap
public static Map<Object,List<Object>> reverseMap(List<List<Object>> listSeq)
"[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}" Reverses an assoc-list style Map like reverseMap(Map...)- Parameters:
listSeq
- to reverse- Returns:
- a reversed map
-
isOnWindows
public static boolean isOnWindows()
-
checkFileExists
public static boolean checkFileExists(String path)
-
forceDelete
public static void forceDelete(String path) throws IOException
Deletes a file or directory and its contents if it exists. Does not complain if the input is null or does not exist.- Parameters:
path
- the path to the file or directory- Throws:
IOException
-
serialize
public static byte[] serialize(Object obj)
-
deserialize
public static <T> T deserialize(byte[] serialized, Class<T> clazz)
-
serializeToString
public static String serializeToString(Object obj)
Serialize an object using the configured serialization and then base64 encode it into a string.- Parameters:
obj
- the object to encode- Returns:
- a string with the encoded object in it.
-
deserializeFromString
public static <T> T deserializeFromString(String str, Class<T> clazz)
Deserialize an object stored in a string. The String is assumed to be a base64 encoded string containing the bytes to actually deserialize.- Parameters:
str
- the encoded string.clazz
- the thrift class we are expecting.- Returns:
- the decoded object
-
toByteArray
public static byte[] toByteArray(ByteBuffer buffer)
-
mkSuicideFn
public static Runnable mkSuicideFn()
-
readAndLogStream
public static void readAndLogStream(String prefix, InputStream in)
-
getComponentCommon
public static ComponentCommon getComponentCommon(StormTopology topology, String id)
-
gzip
public static byte[] gzip(byte[] data)
-
gunzip
public static byte[] gunzip(byte[] data)
-
getGlobalStreamId
public static GlobalStreamId getGlobalStreamId(String componentId, String streamId)
-
getSetComponentObject
public static Object getSetComponentObject(ComponentObject obj)
-
toPositive
public static int toPositive(int number)
A cheap way to deterministically convert a number to a positive value. When the input is positive, the original value is returned. When the input number is negative, the returned positive value is the original value bit AND against Integer.MAX_VALUE(0x7fffffff) which is not its absolutely value.- Parameters:
number
- a given number- Returns:
- a positive number.
-
processPid
public static String processPid()
Get process PID.- Returns:
- the pid of this JVM, because Java doesn't provide a real way to do this.
-
redactValue
public static Map<String,Object> redactValue(Map<String,Object> m, String key)
Creates a new map with a string value in the map replaced with an equivalently-lengthed string of '#'. (If the object is not a string to string will be called on it and replaced)- Parameters:
m
- The map that a value will be redacted fromkey
- The key pointing to the value to be redacted- Returns:
- a new map with the value redacted. The original map will not be modified.
-
createDefaultUncaughtExceptionHandler
public static Thread.UncaughtExceptionHandler createDefaultUncaughtExceptionHandler()
-
createWorkerUncaughtExceptionHandler
public static Thread.UncaughtExceptionHandler createWorkerUncaughtExceptionHandler()
-
setupDefaultUncaughtExceptionHandler
public static void setupDefaultUncaughtExceptionHandler()
-
setupWorkerUncaughtExceptionHandler
public static void setupWorkerUncaughtExceptionHandler()
-
parseJvmHeapMemByChildOpts
public static Double parseJvmHeapMemByChildOpts(List<String> options, Double defaultValue)
parses the arguments to extract jvm heap memory size in MB.- Returns:
- the value of the JVM heap memory setting (in MB) in a java command.
-
getClientBlobStore
public static ClientBlobStore getClientBlobStore(Map<String,Object> conf)
-
getTopologyInfo
public static TopologyInfo getTopologyInfo(String name, String asUser, Map<String,Object> topoConf)
-
getTopologyId
public static String getTopologyId(String name, Nimbus.Iface client)
-
validateTopologyBlobStoreMap
public static void validateTopologyBlobStoreMap(Map<String,Object> topoConf) throws InvalidTopologyException, AuthorizationException
Validate topology blobstore map.- Parameters:
topoConf
- Topology configuration- Throws:
InvalidTopologyException
AuthorizationException
-
validateTopologyBlobStoreMap
public static void validateTopologyBlobStoreMap(Map<String,Object> topoConf, NimbusBlobStore client) throws InvalidTopologyException, AuthorizationException
Validate topology blobstore map.- Parameters:
topoConf
- Topology configurationclient
- The NimbusBlobStore client. It must call prepare() before being used here.- Throws:
InvalidTopologyException
AuthorizationException
-
validateTopologyBlobStoreMap
public static void validateTopologyBlobStoreMap(Map<String,Object> topoConf, BlobStore blobStore) throws InvalidTopologyException, AuthorizationException
Validate topology blobstore map.
-
threadDump
public static String threadDump()
Gets some information, including stack trace, for a running thread.- Returns:
- A human-readable string of the dump.
-
checkDirExists
public static boolean checkDirExists(String dir)
-
getConfiguredClass
public static Object getConfiguredClass(Map<String,Object> conf, Object configKey)
Return a new instance of a pluggable specified in the conf.- Parameters:
conf
- The conf to read from.configKey
- The key pointing to the pluggable class- Returns:
- an instance of the class or null if it is not specified.
-
isZkAuthenticationConfiguredStormServer
public static boolean isZkAuthenticationConfiguredStormServer(Map<String,Object> conf)
Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.- Parameters:
conf
- the storm configuration, not the topology configuration- Returns:
- true if it is configured else false.
-
nullToZero
public static double nullToZero(Double v)
-
OR
public static <V> V OR(V a, V b)
a or b the first one that is not null.- Parameters:
a
- somethingb
- something else- Returns:
- a or b the first one that is not null
-
partitionFixed
public static <T> List<List<T>> partitionFixed(int maxNumChunks, Collection<T> coll)
Fills up chunks out of a collection (given a maximum amount of chunks).i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]] partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]] partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
- Parameters:
maxNumChunks
- the maximum number of chunks to returncoll
- the collection to be chunked up- Returns:
- a list of the chunks, which are themselves lists.
-
getAvailablePort
public static int getAvailablePort(int preferredPort)
Gets an available port. Consider if it is possible to pass port 0 to the server instead of using this method, since there is no guarantee that the port returned by this method will remain free.- Returns:
- The preferred port if available, or a random available port
-
getAvailablePort
public static int getAvailablePort()
Shortcut to callinggetAvailablePort(int)
with 0 as the preferred port.- Returns:
- A random available port
-
findOne
public static <T> T findOne(IPredicate<T> pred, Collection<T> coll)
Find the first item of coll for which pred.test(...) returns true.- Parameters:
pred
- The IPredicate to test forcoll
- The Collection of items to search through.- Returns:
- The first matching value in coll, or null if nothing matches.
-
findOne
public static <T,U> T findOne(IPredicate<T> pred, Map<U,T> map)
-
memoizedLocalHostname
public static String memoizedLocalHostname() throws UnknownHostException
- Throws:
UnknownHostException
-
addVersions
public static StormTopology addVersions(StormTopology topology)
Add version information to the given topology.- Parameters:
topology
- the topology being submitted (MIGHT BE MODIFIED)- Returns:
- topology
-
getConfiguredClasspathVersions
public static NavigableMap<SimpleVersion,List<String>> getConfiguredClasspathVersions(Map<String,Object> conf, List<String> currentClassPath)
Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP- Parameters:
conf
- what to read it out ofcurrentClassPath
- the current classpath for this version of storm (not included in the conf, but returned by this)- Returns:
- the map
-
getAlternativeVersionsMap
public static NavigableMap<String,IVersionInfo> getAlternativeVersionsMap(Map<String,Object> conf)
Get a mapping of the configured supported versions of storm to their actual versions.- Parameters:
conf
- what to read the configuration out of.- Returns:
- the map.
-
getConfiguredWorkerMainVersions
public static NavigableMap<SimpleVersion,String> getConfiguredWorkerMainVersions(Map<String,Object> conf)
Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP- Parameters:
conf
- what to read it out of- Returns:
- the map
-
getConfiguredWorkerLogWriterVersions
public static NavigableMap<SimpleVersion,String> getConfiguredWorkerLogWriterVersions(Map<String,Object> conf)
Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP- Parameters:
conf
- what to read it out of- Returns:
- the map
-
getCompatibleVersion
public static <T> T getCompatibleVersion(NavigableMap<SimpleVersion,T> versionedMap, SimpleVersion desiredVersion, String what, T defaultValue)
-
getConfigFromClasspath
public static Map<String,Object> getConfigFromClasspath(List<String> cp, Map<String,Object> conf) throws IOException
- Throws:
IOException
-
isLocalhostAddress
public static boolean isLocalhostAddress(String address)
-
merge
public static <K,V> Map<K,V> merge(Map<? extends K,? extends V> first, Map<? extends K,? extends V> other)
-
forceDeleteImpl
protected void forceDeleteImpl(String path) throws IOException
- Throws:
IOException
-
makeUptimeComputerImpl
public Utils.UptimeComputer makeUptimeComputerImpl()
-
localHostnameImpl
protected String localHostnameImpl() throws UnknownHostException
- Throws:
UnknownHostException
-
hostnameImpl
protected String hostnameImpl() throws UnknownHostException
- Throws:
UnknownHostException
-
isValidKey
public static boolean isValidKey(String key)
Validates blob key.- Parameters:
key
- Key for the blob.
-
validateTopologyName
public static void validateTopologyName(String name) throws IllegalArgumentException
Validates topology name.- Parameters:
name
- the topology name- Throws:
IllegalArgumentException
- if the topology name is not valid
-
findComponentCycles
public static List<List<String>> findComponentCycles(StormTopology topology, String topoId)
Find and return components cycles in the topology graph when starting from spout. Return a list of cycles. Each cycle may consist of one or more components. Components that cannot be reached from any of the spouts are ignored.- Returns:
- a List of cycles. Each cycle has a list of component names.
-
validateCycleFree
public static void validateCycleFree(StormTopology topology, String name) throws InvalidTopologyException
Validate that the topology is cycle free. If not, then throw an InvalidTopologyException describing the cycle(s).- Parameters:
topology
- StormTopology instance to examine.name
- Name of the topology, used in exception error message.- Throws:
InvalidTopologyException
- if there are cycles, with message describing the cycles encountered.
-
-