Package org.apache.storm.utils
Class ServerUtils
- java.lang.Object
-
- org.apache.storm.utils.ServerUtils
-
public class ServerUtils extends Object
-
-
Field Summary
Fields Modifier and Type Field Description static boolean
IS_ON_WINDOWS
static org.slf4j.Logger
LOG
static int
SIGKILL
static int
SIGTERM
-
Constructor Summary
Constructors Constructor Description ServerUtils()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static boolean
areAllProcessesDead(Map<String,Object> conf, String user, String workerId, Set<Long> pids)
Find if all processes for the user on workId are dead.static boolean
canUserReadBlob(ReadableBlobMeta meta, String user, Map<String,Object> conf)
static String
containerFilePath(String dir)
static String
currentClasspath()
Returns the value of java.class.path System property.String
currentClasspathImpl()
static int
execCommand(String... command)
static void
extractZipFile(ZipFile zipFile, File toDir, String prefix)
Extracts the given file to the given directory.static void
forceKillProcess(String pid)
static ClientBlobStore
getClientBlobStoreForSupervisor(Map<String,Object> conf)
static int
getComponentParallelism(Map<String,Object> topoConf, Object component)
static Map<String,Integer>
getComponentParallelism(Map<String,Object> topoConf, StormTopology topology)
static long
getDiskUsage(File dir)
Takes an input dir or file and returns the disk usage on that local directory.static double
getEstimatedTotalHeapMemoryRequiredByTopo(Map<String,Object> topoConf, StormTopology topology)
static int
getEstimatedWorkerCountForRasTopo(Map<String,Object> topoConf, StormTopology topology)
static String
getFileOwner(String path)
static long
getMemInfoFreeMb()
Get system free memory in megabytes.static BlobStore
getNimbusBlobStore(Map<String,Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector)
static BlobStore
getNimbusBlobStore(Map<String,Object> conf, NimbusInfo nimbusInfo, ILeaderElector leaderElector)
static int
getPathOwnerUid(String fpath)
Get the userId of the onwer of the path by running "ls -dn path" command.static URL
getResourceFromClassloader(String name)
Returns the current thread classloader.URL
getResourceFromClassloaderImpl(String name)
static int
getUserId(String user)
Get the userId for a user name.static <T> List<T>
interleaveAll(List<List<T>> nodeList)
static boolean
isAbsolutePath(String path)
static boolean
isAnyPosixProcessPidDirAlive(Collection<Long> pids, String user)
Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied user.static boolean
isAnyPosixProcessPidDirAlive(Collection<Long> pids, String expectedUser, boolean mockFileOwnerToUid)
Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied expectedUser.static boolean
isAnyProcessAlive(Collection<Long> pids, int uid)
Are any of the processes alive and running for the specified userId.static boolean
isAnyProcessAlive(Collection<Long> pids, String user)
Are any of the processes alive and running for the specified user.static boolean
isProcessAlive(long pid, String user)
Is a process alive and running?.static boolean
isRas(Map<String,Object> conf)
Check if the scheduler is resource aware or not.static void
killProcessWithSigTerm(String pid)
static long
nimbusVersionOfBlob(String key, ClientBlobStore cb)
static Subject
principalNameToSubject(String name)
static String
scriptFilePath(String dir)
static void
sendSignalToProcess(long lpid, int signum)
static ServerUtils
setInstance(ServerUtils u)
Provide an instance of this class for delegates to use.static String
shellCmd(List<String> command)
Returns the combined string, escaped for posix shell.static void
unJar(File jarFile, File toDir)
Unpack matching files from a jar.static void
unpack(File localrsrc, File dst, boolean symLinksDisabled)
static void
unTar(File inFile, File untarDir, boolean symlinksDisabled)
Given a Tar File as input it will untar the file in a the untar directory passed as the second parameterstatic void
unZip(File inFile, File toDir)
Given a File input it will unzip the file in a the unzip directory passed as the second parameter.static void
validateTopologyAckerBundleResource(Map<String,Object> topoConf, StormTopology topology, String topoName)
RAS scheduler will try to distribute ackers evenly over workers by adding some ackers to each newly launched worker.static void
validateTopologyWorkerMaxHeapSizeConfigs(Map<String,Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMb)
static String
writeScript(String dir, List<String> command, Map<String,String> environment)
Writes a posix shell script file to be executed in its own process.static String
writeScript(String dir, List<String> command, Map<String,String> environment, String umask)
Writes a posix shell script file to be executed in its own process.static boolean
zipDoesContainDir(String zipfile, String target)
Determines if a zip archive contains a particular directory.static long
zipFileSize(File myFile)
Given a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns the size module 2^32, per gzip specifications.
-
-
-
Field Detail
-
LOG
public static final org.slf4j.Logger LOG
-
IS_ON_WINDOWS
public static final boolean IS_ON_WINDOWS
-
SIGKILL
public static final int SIGKILL
- See Also:
- Constant Field Values
-
SIGTERM
public static final int SIGTERM
- See Also:
- Constant Field Values
-
-
Method Detail
-
setInstance
public static ServerUtils setInstance(ServerUtils u)
Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.- Parameters:
u
- a ServerUtils instance- Returns:
- the previously set instance
-
getNimbusBlobStore
public static BlobStore getNimbusBlobStore(Map<String,Object> conf, NimbusInfo nimbusInfo, ILeaderElector leaderElector)
-
getNimbusBlobStore
public static BlobStore getNimbusBlobStore(Map<String,Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector)
-
isAbsolutePath
public static boolean isAbsolutePath(String path)
-
shellCmd
public static String shellCmd(List<String> command)
Returns the combined string, escaped for posix shell.- Parameters:
command
- the list of strings to be combined- Returns:
- the resulting command string
-
getDiskUsage
public static long getDiskUsage(File dir)
Takes an input dir or file and returns the disk usage on that local directory. Very basic implementation.- Parameters:
dir
- The input dir to get the disk space of this local dir- Returns:
- The total disk space of the input local directory
-
getClientBlobStoreForSupervisor
public static ClientBlobStore getClientBlobStoreForSupervisor(Map<String,Object> conf)
-
currentClasspath
public static String currentClasspath()
Returns the value of java.class.path System property. Kept separate for testing.- Returns:
- the classpath
-
getResourceFromClassloader
public static URL getResourceFromClassloader(String name)
Returns the current thread classloader.
-
zipDoesContainDir
public static boolean zipDoesContainDir(String zipfile, String target) throws IOException
Determines if a zip archive contains a particular directory.- Parameters:
zipfile
- path to the zipped filetarget
- directory being looked for in the zip.- Returns:
- boolean whether or not the directory exists in the zip.
- Throws:
IOException
-
getFileOwner
public static String getFileOwner(String path) throws IOException
- Throws:
IOException
-
writeScript
public static String writeScript(String dir, List<String> command, Map<String,String> environment) throws IOException
Writes a posix shell script file to be executed in its own process.- Parameters:
dir
- the directory under which the script is to be writtencommand
- the command the script is to executeenvironment
- optional environment variables to set before running the script's command. May be null.- Returns:
- the path to the script that has been written
- Throws:
IOException
-
writeScript
public static String writeScript(String dir, List<String> command, Map<String,String> environment, String umask) throws IOException
Writes a posix shell script file to be executed in its own process.- Parameters:
dir
- the directory under which the script is to be writtencommand
- the command the script is to executeenvironment
- optional environment variables to set before running the script's command. May be null.umask
- umask to be set. It can be null.- Returns:
- the path to the script that has been written
- Throws:
IOException
-
execCommand
public static int execCommand(String... command) throws org.apache.commons.exec.ExecuteException, IOException
- Throws:
org.apache.commons.exec.ExecuteException
IOException
-
sendSignalToProcess
public static void sendSignalToProcess(long lpid, int signum) throws IOException
- Throws:
IOException
-
killProcessWithSigTerm
public static void killProcessWithSigTerm(String pid) throws IOException
- Throws:
IOException
-
forceKillProcess
public static void forceKillProcess(String pid) throws IOException
- Throws:
IOException
-
nimbusVersionOfBlob
public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException
-
canUserReadBlob
public static boolean canUserReadBlob(ReadableBlobMeta meta, String user, Map<String,Object> conf)
-
unJar
public static void unJar(File jarFile, File toDir) throws IOException
Unpack matching files from a jar. Entries inside the jar that do not match the given pattern will be skipped.- Parameters:
jarFile
- the .jar file to unpacktoDir
- the destination directory into which to unpack the jar- Throws:
IOException
-
unTar
public static void unTar(File inFile, File untarDir, boolean symlinksDisabled) throws IOException
Given a Tar File as input it will untar the file in a the untar directory passed as the second parameter This utility will untar ".tar" files and ".tar.gz","tgz" files.- Parameters:
inFile
- The tar file as inputuntarDir
- The untar directory where to untar the tar filesymlinksDisabled
- true if symlinks should be disabled, else false- Throws:
IOException
-
unpack
public static void unpack(File localrsrc, File dst, boolean symLinksDisabled) throws IOException
- Throws:
IOException
-
extractZipFile
public static void extractZipFile(ZipFile zipFile, File toDir, String prefix) throws IOException
Extracts the given file to the given directory. Only zip entries starting with the given prefix are extracted. The prefix is stripped off entry names before extraction.- Parameters:
zipFile
- The zip file to extracttoDir
- The directory to extract toprefix
- The prefix to look for in the zip file. If not null only paths starting with the prefix will be extracted- Throws:
IOException
-
unZip
public static void unZip(File inFile, File toDir) throws IOException
Given a File input it will unzip the file in a the unzip directory passed as the second parameter.- Parameters:
inFile
- The zip file as inputtoDir
- The unzip directory where to unzip the zip file- Throws:
IOException
-
zipFileSize
public static long zipFileSize(File myFile) throws IOException
Given a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns the size module 2^32, per gzip specifications.- Parameters:
myFile
- The zip file as input- Returns:
- zip file size as a long
- Throws:
IOException
-
isRas
public static boolean isRas(Map<String,Object> conf)
Check if the scheduler is resource aware or not.- Parameters:
conf
- The configuration- Returns:
- True if it's resource aware; false otherwise
-
getEstimatedWorkerCountForRasTopo
public static int getEstimatedWorkerCountForRasTopo(Map<String,Object> topoConf, StormTopology topology) throws InvalidTopologyException
- Throws:
InvalidTopologyException
-
getEstimatedTotalHeapMemoryRequiredByTopo
public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String,Object> topoConf, StormTopology topology) throws InvalidTopologyException
- Throws:
InvalidTopologyException
-
getComponentParallelism
public static Map<String,Integer> getComponentParallelism(Map<String,Object> topoConf, StormTopology topology) throws InvalidTopologyException
- Throws:
InvalidTopologyException
-
getComponentParallelism
public static int getComponentParallelism(Map<String,Object> topoConf, Object component) throws InvalidTopologyException
- Throws:
InvalidTopologyException
-
currentClasspathImpl
public String currentClasspathImpl()
-
getMemInfoFreeMb
public static long getMemInfoFreeMb() throws IOException
Get system free memory in megabytes.- Returns:
- system free memory in megabytes
- Throws:
IOException
- on I/O exception
-
isProcessAlive
public static boolean isProcessAlive(long pid, String user) throws IOException
Is a process alive and running?.- Parameters:
pid
- the PID of the running processuser
- the user that is expected to own that process- Returns:
- true if it is, else false
- Throws:
IOException
- on any error
-
isAnyProcessAlive
public static boolean isAnyProcessAlive(Collection<Long> pids, String user) throws IOException
Are any of the processes alive and running for the specified user. If collection is empty or null then the return value is trivially false.- Parameters:
pids
- the PIDs of the running processesuser
- the user that is expected to own that process- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
IOException
- on I/O exception
-
isAnyProcessAlive
public static boolean isAnyProcessAlive(Collection<Long> pids, int uid) throws IOException
Are any of the processes alive and running for the specified userId. If collection is empty or null then the return value is trivially false.- Parameters:
pids
- the PIDs of the running processesuid
- the user that is expected to own that process- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
IOException
- on I/O exception
-
getUserId
public static int getUserId(String user)
Get the userId for a user name. This works on Posix systems by using "id -u" command. Throw IllegalArgumentException on Windows.- Parameters:
user
- username to be converted to UID. This is optional, in which case current user is returned.- Returns:
- UID for the specified user (if supplied), else UID of current user, -1 upon Exception.
-
getPathOwnerUid
public static int getPathOwnerUid(String fpath)
Get the userId of the onwer of the path by running "ls -dn path" command. This command works on Posix systems only.- Parameters:
fpath
- full path to the file or directory.- Returns:
- UID for the specified if successful, -1 upon failure.
-
areAllProcessesDead
public static boolean areAllProcessesDead(Map<String,Object> conf, String user, String workerId, Set<Long> pids) throws IOException
Find if all processes for the user on workId are dead. This method attempts to optimize the calls by:- checking a collection of ProcessIds at once
- using userId one Posix systems instead of user
- Returns:
- true if all processes for the user are dead on the worker
- Throws:
IOException
- if external commands have exception.
-
isAnyPosixProcessPidDirAlive
public static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String user) throws IOException
Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied user. This is an alternative to "ps -p pid -u uid" command used inisAnyPosixProcessAlive(Collection, int)
Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid.
- Parameters:
pids
- Process IDs that need to be monitored for livenessuser
- the userId that is expected to own that process- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
IOException
- on I/O exception
-
isAnyPosixProcessPidDirAlive
public static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String expectedUser, boolean mockFileOwnerToUid) throws IOException
Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied expectedUser. This is an alternative to "ps -p pid -u uid" command used inisAnyPosixProcessAlive(Collection, int)
Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid.
- Parameters:
pids
- Process IDs that need to be monitored for livenessexpectedUser
- the userId that is expected to own that processmockFileOwnerToUid
- if true (used for testing), then convert File.owner to UID- Returns:
- true if any one of the processes is owned by expectedUser and alive, else false
- Throws:
IOException
- on I/O exception
-
validateTopologyWorkerMaxHeapSizeConfigs
public static void validateTopologyWorkerMaxHeapSizeConfigs(Map<String,Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMb) throws InvalidTopologyException
- Throws:
InvalidTopologyException
-
validateTopologyAckerBundleResource
public static void validateTopologyAckerBundleResource(Map<String,Object> topoConf, StormTopology topology, String topoName) throws InvalidTopologyException
RAS scheduler will try to distribute ackers evenly over workers by adding some ackers to each newly launched worker. Validations are performed here: (Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER
* memory for an acker + memory for the biggest topo executor) < max worker heap memory. When RAS tries to schedule an executor to a new worker, it will putConfig.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER
ackers into the worker first. SoConfig.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB
need to be able to accommodate this.- Parameters:
topoConf
- Topology conftopology
- Topology (not system topology)topoName
- The name of the topology- Throws:
InvalidTopologyException
-
-