Class ServerUtils

java.lang.Object
org.apache.storm.utils.ServerUtils

public class ServerUtils extends Object
  • Field Details

    • LOG

      public static final org.slf4j.Logger LOG
    • IS_ON_WINDOWS

      public static final boolean IS_ON_WINDOWS
    • SIGKILL

      public static final int SIGKILL
      See Also:
    • SIGTERM

      public static final int SIGTERM
      See Also:
  • Constructor Details

    • ServerUtils

      public ServerUtils()
  • Method Details

    • setInstance

      public static ServerUtils setInstance(ServerUtils u)
      Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.
      Parameters:
      u - a ServerUtils instance
      Returns:
      the previously set instance
    • interleaveAll

      public static <T> List<T> interleaveAll(List<List<T>> nodeList)
    • getNimbusBlobStore

      public static BlobStore getNimbusBlobStore(Map<String,Object> conf, NimbusInfo nimbusInfo, ILeaderElector leaderElector)
    • getNimbusBlobStore

      public static BlobStore getNimbusBlobStore(Map<String,Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector)
    • isAbsolutePath

      public static boolean isAbsolutePath(String path)
    • shellCmd

      public static String shellCmd(List<String> command)
      Returns the combined string, escaped for posix shell.
      Parameters:
      command - the list of strings to be combined
      Returns:
      the resulting command string
    • getDiskUsage

      public static long getDiskUsage(File dir)
      Takes an input dir or file and returns the disk usage on that local directory. Very basic implementation.
      Parameters:
      dir - The input dir to get the disk space of this local dir
      Returns:
      The total disk space of the input local directory
    • getClientBlobStoreForSupervisor

      public static ClientBlobStore getClientBlobStoreForSupervisor(Map<String,Object> conf)
    • currentClasspath

      public static String currentClasspath()
      Returns the value of java.class.path System property. Kept separate for testing.
      Returns:
      the classpath
    • getResourceFromClassloader

      public static URL getResourceFromClassloader(String name)
      Returns the current thread classloader.
    • zipDoesContainDir

      public static boolean zipDoesContainDir(String zipfile, String target) throws IOException
      Determines if a zip archive contains a particular directory.
      Parameters:
      zipfile - path to the zipped file
      target - directory being looked for in the zip.
      Returns:
      boolean whether or not the directory exists in the zip.
      Throws:
      IOException
    • getFileOwner

      public static String getFileOwner(String path) throws IOException
      Throws:
      IOException
    • containerFilePath

      public static String containerFilePath(String dir)
    • scriptFilePath

      public static String scriptFilePath(String dir)
    • writeScript

      public static String writeScript(String dir, List<String> command, Map<String,String> environment) throws IOException
      Writes a posix shell script file to be executed in its own process.
      Parameters:
      dir - the directory under which the script is to be written
      command - the command the script is to execute
      environment - optional environment variables to set before running the script's command. May be null.
      Returns:
      the path to the script that has been written
      Throws:
      IOException
    • writeScript

      public static String writeScript(String dir, List<String> command, Map<String,String> environment, String umask) throws IOException
      Writes a posix shell script file to be executed in its own process.
      Parameters:
      dir - the directory under which the script is to be written
      command - the command the script is to execute
      environment - optional environment variables to set before running the script's command. May be null.
      umask - umask to be set. It can be null.
      Returns:
      the path to the script that has been written
      Throws:
      IOException
    • execCommand

      public static int execCommand(String... command) throws org.apache.commons.exec.ExecuteException, IOException
      Throws:
      org.apache.commons.exec.ExecuteException
      IOException
    • sendSignalToProcess

      public static void sendSignalToProcess(long lpid, int signum) throws IOException
      Throws:
      IOException
    • killProcessWithSigTerm

      public static void killProcessWithSigTerm(String pid) throws IOException
      Throws:
      IOException
    • forceKillProcess

      public static void forceKillProcess(String pid) throws IOException
      Throws:
      IOException
    • nimbusVersionOfBlob

      public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException
      Throws:
      AuthorizationException
      KeyNotFoundException
    • canUserReadBlob

      public static boolean canUserReadBlob(ReadableBlobMeta meta, String user, Map<String,Object> conf)
    • unJar

      public static void unJar(File jarFile, File toDir) throws IOException
      Unpack matching files from a jar. Entries inside the jar that do not match the given pattern will be skipped.
      Parameters:
      jarFile - the .jar file to unpack
      toDir - the destination directory into which to unpack the jar
      Throws:
      IOException
    • unTar

      public static void unTar(File inFile, File untarDir, boolean symlinksDisabled) throws IOException
      Given a Tar File as input it will untar the file in a the untar directory passed as the second parameter

      This utility will untar ".tar" files and ".tar.gz","tgz" files.

      Parameters:
      inFile - The tar file as input
      untarDir - The untar directory where to untar the tar file
      symlinksDisabled - true if symlinks should be disabled, else false
      Throws:
      IOException
    • unpack

      public static void unpack(File localrsrc, File dst, boolean symLinksDisabled) throws IOException
      Throws:
      IOException
    • extractZipFile

      public static void extractZipFile(ZipFile zipFile, File toDir, String prefix) throws IOException
      Extracts the given file to the given directory. Only zip entries starting with the given prefix are extracted. The prefix is stripped off entry names before extraction.
      Parameters:
      zipFile - The zip file to extract
      toDir - The directory to extract to
      prefix - The prefix to look for in the zip file. If not null only paths starting with the prefix will be extracted
      Throws:
      IOException
    • unZip

      public static void unZip(File inFile, File toDir) throws IOException
      Given a File input it will unzip the file in a the unzip directory passed as the second parameter.
      Parameters:
      inFile - The zip file as input
      toDir - The unzip directory where to unzip the zip file
      Throws:
      IOException
    • zipFileSize

      public static long zipFileSize(File myFile) throws IOException
      Given a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns the size module 2^32, per gzip specifications.
      Parameters:
      myFile - The zip file as input
      Returns:
      zip file size as a long
      Throws:
      IOException
    • isRas

      public static boolean isRas(Map<String,Object> conf)
      Check if the scheduler is resource aware or not.
      Parameters:
      conf - The configuration
      Returns:
      True if it's resource aware; false otherwise
    • getEstimatedWorkerCountForRasTopo

      public static int getEstimatedWorkerCountForRasTopo(Map<String,Object> topoConf, StormTopology topology) throws InvalidTopologyException
      Throws:
      InvalidTopologyException
    • getEstimatedTotalHeapMemoryRequiredByTopo

      public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String,Object> topoConf, StormTopology topology) throws InvalidTopologyException
      Throws:
      InvalidTopologyException
    • getComponentParallelism

      public static Map<String,Integer> getComponentParallelism(Map<String,Object> topoConf, StormTopology topology) throws InvalidTopologyException
      Throws:
      InvalidTopologyException
    • getComponentParallelism

      public static int getComponentParallelism(Map<String,Object> topoConf, Object component) throws InvalidTopologyException
      Throws:
      InvalidTopologyException
    • principalNameToSubject

      public static Subject principalNameToSubject(String name)
    • currentClasspathImpl

      public String currentClasspathImpl()
    • getResourceFromClassloaderImpl

      public URL getResourceFromClassloaderImpl(String name)
    • getMemInfoFreeMb

      public static long getMemInfoFreeMb() throws IOException
      Get system free memory in megabytes.
      Returns:
      system free memory in megabytes
      Throws:
      IOException - on I/O exception
    • isProcessAlive

      public static boolean isProcessAlive(long pid, String user) throws IOException
      Is a process alive and running?.
      Parameters:
      pid - the PID of the running process
      user - the user that is expected to own that process
      Returns:
      true if it is, else false
      Throws:
      IOException - on any error
    • isAnyProcessAlive

      public static boolean isAnyProcessAlive(Collection<Long> pids, String user) throws IOException
      Are any of the processes alive and running for the specified user. If collection is empty or null then the return value is trivially false.
      Parameters:
      pids - the PIDs of the running processes
      user - the user that is expected to own that process
      Returns:
      true if any one of the processes is owned by user and alive, else false
      Throws:
      IOException - on I/O exception
    • isAnyProcessAlive

      public static boolean isAnyProcessAlive(Collection<Long> pids, int uid) throws IOException
      Are any of the processes alive and running for the specified userId. If collection is empty or null then the return value is trivially false.
      Parameters:
      pids - the PIDs of the running processes
      uid - the user that is expected to own that process
      Returns:
      true if any one of the processes is owned by user and alive, else false
      Throws:
      IOException - on I/O exception
    • getUserId

      public static int getUserId(String user)
      Get the userId for a user name. This works on Posix systems by using "id -u" command. Throw IllegalArgumentException on Windows.
      Parameters:
      user - username to be converted to UID. This is optional, in which case current user is returned.
      Returns:
      UID for the specified user (if supplied), else UID of current user, -1 upon Exception.
    • getPathOwnerUid

      public static int getPathOwnerUid(String fpath)
      Get the userId of the onwer of the path by running "ls -dn path" command. This command works on Posix systems only.
      Parameters:
      fpath - full path to the file or directory.
      Returns:
      UID for the specified if successful, -1 upon failure.
    • areAllProcessesDead

      public static boolean areAllProcessesDead(Map<String,Object> conf, String user, String workerId, Set<Long> pids) throws IOException
      Find if all processes for the user on workId are dead. This method attempts to optimize the calls by:

    • checking a collection of ProcessIds at once
    • using userId one Posix systems instead of user
    • Returns:
      true if all processes for the user are dead on the worker
      Throws:
      IOException - if external commands have exception.
    • isAnyPosixProcessPidDirAlive

      public static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String user) throws IOException
      Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied user. This is an alternative to "ps -p pid -u uid" command used in isAnyPosixProcessAlive(Collection, int)

      Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid.

      Parameters:
      pids - Process IDs that need to be monitored for liveness
      user - the userId that is expected to own that process
      Returns:
      true if any one of the processes is owned by user and alive, else false
      Throws:
      IOException - on I/O exception
    • isAnyPosixProcessPidDirAlive

      public static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String expectedUser, boolean mockFileOwnerToUid) throws IOException
      Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied expectedUser. This is an alternative to "ps -p pid -u uid" command used in isAnyPosixProcessAlive(Collection, int)

      Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid.

      Parameters:
      pids - Process IDs that need to be monitored for liveness
      expectedUser - the userId that is expected to own that process
      mockFileOwnerToUid - if true (used for testing), then convert File.owner to UID
      Returns:
      true if any one of the processes is owned by expectedUser and alive, else false
      Throws:
      IOException - on I/O exception
    • validateTopologyWorkerMaxHeapSizeConfigs

      public static void validateTopologyWorkerMaxHeapSizeConfigs(Map<String,Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMb) throws InvalidTopologyException
      Throws:
      InvalidTopologyException
    • validateTopologyAckerBundleResource

      public static void validateTopologyAckerBundleResource(Map<String,Object> topoConf, StormTopology topology, String topoName) throws InvalidTopologyException
      RAS scheduler will try to distribute ackers evenly over workers by adding some ackers to each newly launched worker. Validations are performed here: (Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER * memory for an acker + memory for the biggest topo executor) < max worker heap memory. When RAS tries to schedule an executor to a new worker, it will put Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER ackers into the worker first. So Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB need to be able to accommodate this.
      Parameters:
      topoConf - Topology conf
      topology - Topology (not system topology)
      topoName - The name of the topology
      Throws:
      InvalidTopologyException