import os import sys import stat import select import time import errno try: InterruptedError except NameError: # Alias Python2 exception to Python3 InterruptedError = select.error if sys.version_info[0] >= 3: string_types = (str,) else: string_types = (unicode, str) def is_executable_file(path): """Checks that path is an executable regular file, or a symlink towards one. This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``. """ # follow symlinks, fpath = os.path.realpath(path) if not os.path.isfile(fpath): # non-files (directories, fifo, etc.) return False mode = os.stat(fpath).st_mode if (sys.platform.startswith('sunos') and os.getuid() == 0): # When root on Solaris, os.X_OK is True for *all* files, irregardless # of their executability -- instead, any permission bit of any user, # group, or other is fine enough. # # (This may be true for other "Unix98" OS's such as HP-UX and AIX) return bool(mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) return os.access(fpath, os.X_OK) def which(filename, env=None): '''This takes a given filename; tries to find it in the environment path; then checks if it is executable. This returns the full path to the filename if found and executable. Otherwise this returns None.''' # Special case where filename contains an explicit path. if os.path.dirname(filename) != '' and is_executable_file(filename): return filename if env is None: env = os.environ p = env.get('PATH') if not p: p = os.defpath pathlist = p.split(os.pathsep) for path in pathlist: ff = os.path.join(path, filename) if is_executable_file(ff): return ff return None def split_command_line(command_line): '''This splits a command line into a list of arguments. It splits arguments on spaces, but handles embedded quotes, doublequotes, and escaped characters. It's impossible to do this with a regular expression, so I wrote a little state machine to parse the command line. ''' arg_list = [] arg = '' # Constants to name the states we can be in. state_basic = 0 state_esc = 1 state_singlequote = 2 state_doublequote = 3 # The state when consuming whitespace between commands. state_whitespace = 4 state = state_basic for c in command_line: if state == state_basic or state == state_whitespace: if c == '\\': # Escape the next character state = state_esc elif c == r"'": # Handle single quote state = state_singlequote elif c == r'"': # Handle double quote state = state_doublequote elif c.isspace(): # Add arg to arg_list if we aren't in the middle of whitespace. if state == state_whitespace: # Do nothing. None else: arg_list.append(arg) arg = '' state = state_whitespace else: arg = arg + c state = state_basic elif state == state_esc: arg = arg + c state = state_basic elif state == state_singlequote: if c == r"'": state = state_basic else: arg = arg + c elif state == state_doublequote: if c == r'"': state = state_basic else: arg = arg + c if arg != '': arg_list.append(arg) return arg_list def select_ignore_interrupts(iwtd, owtd, ewtd, timeout=None): '''This is a wrapper around select.select() that ignores signals. If select.select raises a select.error exception and errno is an EINTR error then it is ignored. Mainly this is used to ignore sigwinch (terminal resize). ''' # if select() is interrupted by a signal (errno==EINTR) then # we loop back and enter the select() again. if timeout is not None: end_time = time.time() + timeout while True: try: return select.select(iwtd, owtd, ewtd, timeout) except InterruptedError: err = sys.exc_info()[1] if err.args[0] == errno.EINTR: # if we loop back we have to subtract the # amount of time we already waited. if timeout is not None: timeout = end_time - time.time() if timeout < 0: return([], [], []) else: # something else caused the select.error, so # this actually is an exception. raise def poll_ignore_interrupts(fds, timeout=None): '''Simple wrapper around poll to register file descriptors and ignore signals.''' if timeout is not None: end_time = time.time() + timeout poller = select.poll() for fd in fds: poller.register(fd, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) while True: try: timeout_ms = None if timeout is None else timeout * 1000 results = poller.poll(timeout_ms) return [afd for afd, _ in results] except InterruptedError: err = sys.exc_info()[1] if err.args[0] == errno.EINTR: # if we loop back we have to subtract the # amount of time we already waited. if timeout is not None: timeout = end_time - time.time() if timeout < 0: return [] else: # something else caused the select.error, so # this actually is an exception. raise