511 lines
19 KiB

"""
Low-level operating system functions from :mod:`os`.
Cooperative I/O
===============
This module provides cooperative versions of :func:`os.read` and
:func:`os.write`. These functions are *not* monkey-patched; you
must explicitly call them or monkey patch them yourself.
POSIX functions
---------------
On POSIX, non-blocking IO is available.
- :func:`nb_read`
- :func:`nb_write`
- :func:`make_nonblocking`
All Platforms
-------------
On non-POSIX platforms (e.g., Windows), non-blocking IO is not
available. On those platforms (and on POSIX), cooperative IO can
be done with the threadpool.
- :func:`tp_read`
- :func:`tp_write`
Child Processes
===============
The functions :func:`fork` and (on POSIX) :func:`forkpty` and :func:`waitpid` can be used
to manage child processes.
.. warning::
Forking a process that uses greenlets does not eliminate all non-running
greenlets. Any that were scheduled in the hub of the forking thread in the parent
remain scheduled in the child; compare this to how normal threads operate. (This behaviour
may change is a subsequent major release.)
"""
from __future__ import absolute_import
import os
import sys
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import reinit
from gevent._config import config
from gevent._compat import PY3
from gevent._util import copy_globals
import errno
EAGAIN = getattr(errno, 'EAGAIN', 11)
try:
import fcntl
except ImportError:
fcntl = None
__implements__ = ['fork']
__extensions__ = ['tp_read', 'tp_write']
_read = os.read
_write = os.write
ignored_errors = [EAGAIN, errno.EINTR]
if fcntl:
__extensions__ += ['make_nonblocking', 'nb_read', 'nb_write']
def make_nonblocking(fd):
"""Put the file descriptor *fd* into non-blocking mode if
possible.
:return: A boolean value that evaluates to True if successful.
"""
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
if not bool(flags & os.O_NONBLOCK):
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
return True
def nb_read(fd, n):
"""
Read up to *n* bytes from file descriptor *fd*. Return a
byte string containing the bytes read, which may be shorter than
*n*. If end-of-file is reached, an empty string is returned.
The descriptor must be in non-blocking mode.
"""
hub = None
event = None
try:
while 1:
try:
result = _read(fd, n)
return result
except OSError as e:
if e.errno not in ignored_errors:
raise
if not PY3:
sys.exc_clear()
if hub is None:
hub = get_hub()
event = hub.loop.io(fd, 1)
hub.wait(event)
finally:
if event is not None:
event.close()
event = None
hub = None
def nb_write(fd, buf):
"""
Write some number of bytes from buffer *buf* to file
descriptor *fd*. Return the number of bytes written, which may
be less than the length of *buf*.
The file descriptor must be in non-blocking mode.
"""
hub = None
event = None
try:
while 1:
try:
result = _write(fd, buf)
return result
except OSError as e:
if e.errno not in ignored_errors:
raise
if not PY3:
sys.exc_clear()
if hub is None:
hub = get_hub()
event = hub.loop.io(fd, 2)
hub.wait(event)
finally:
if event is not None:
event.close()
event = None
hub = None
def tp_read(fd, n):
"""Read up to *n* bytes from file descriptor *fd*. Return a string
containing the bytes read. If end-of-file is reached, an empty string
is returned.
Reading is done using the threadpool.
"""
return get_hub().threadpool.apply(_read, (fd, n))
def tp_write(fd, buf):
"""Write bytes from buffer *buf* to file descriptor *fd*. Return the
number of bytes written.
Writing is done using the threadpool.
"""
return get_hub().threadpool.apply(_write, (fd, buf))
if hasattr(os, 'fork'):
# pylint:disable=function-redefined,redefined-outer-name
_raw_fork = os.fork
def fork_gevent():
"""
Forks the process using :func:`os.fork` and prepares the
child process to continue using gevent before returning.
.. note::
The PID returned by this function may not be waitable with
either the original :func:`os.waitpid` or this module's
:func:`waitpid` and it may not generate SIGCHLD signals if
libev child watchers are or ever have been in use. For
example, the :mod:`gevent.subprocess` module uses libev
child watchers (which parts of gevent use libev child
watchers is subject to change at any time). Most
applications should use :func:`fork_and_watch`, which is
monkey-patched as the default replacement for
:func:`os.fork` and implements the ``fork`` function of
this module by default, unless the environment variable
``GEVENT_NOWAITPID`` is defined before this module is
imported.
.. versionadded:: 1.1b2
"""
result = _raw_fork()
if not result:
reinit()
return result
def fork():
"""
A wrapper for :func:`fork_gevent` for non-POSIX platforms.
"""
return fork_gevent()
if hasattr(os, 'forkpty'):
_raw_forkpty = os.forkpty
def forkpty_gevent():
"""
Forks the process using :func:`os.forkpty` and prepares the
child process to continue using gevent before returning.
Returns a tuple (pid, master_fd). The `master_fd` is *not* put into
non-blocking mode.
Availability: Some Unix systems.
.. seealso:: This function has the same limitations as :func:`fork_gevent`.
.. versionadded:: 1.1b5
"""
pid, master_fd = _raw_forkpty()
if not pid:
reinit()
return pid, master_fd
forkpty = forkpty_gevent
__implements__.append('forkpty')
__extensions__.append("forkpty_gevent")
if hasattr(os, 'WNOWAIT') or hasattr(os, 'WNOHANG'):
# We can only do this on POSIX
import time
_waitpid = os.waitpid
_WNOHANG = os.WNOHANG
# replaced by the signal module.
_on_child_hook = lambda: None
# {pid -> watcher or tuple(pid, rstatus, timestamp)}
_watched_children = {}
def _on_child(watcher, callback):
# XXX: Could handle tracing here by not stopping
# until the pid is terminated
watcher.stop()
try:
_watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time())
if callback:
callback(watcher)
# dispatch an "event"; used by gevent.signal.signal
_on_child_hook()
# now is as good a time as any to reap children
_reap_children()
finally:
watcher.close()
def _reap_children(timeout=60):
# Remove all the dead children that haven't been waited on
# for the *timeout* seconds.
# Some platforms queue delivery of SIGCHLD for all children that die;
# in that case, a well-behaved application should call waitpid() for each
# signal.
# Some platforms (linux) only guarantee one delivery if multiple children
# die. On that platform, the well-behave application calls waitpid() in a loop
# until it gets back -1, indicating no more dead children need to be waited for.
# In either case, waitpid should be called the same number of times as dead children,
# thus removing all the watchers when a SIGCHLD arrives. The (generous) timeout
# is to work with applications that neglect to call waitpid and prevent "unlimited"
# growth.
# Note that we don't watch for the case of pid wraparound. That is, we fork a new
# child with the same pid as an existing watcher, but the child is already dead,
# just not waited on yet.
now = time.time()
oldest_allowed = now - timeout
dead = [pid for pid, val
in _watched_children.items()
if isinstance(val, tuple) and val[2] < oldest_allowed]
for pid in dead:
del _watched_children[pid]
def waitpid(pid, options):
"""
Wait for a child process to finish.
If the child process was spawned using
:func:`fork_and_watch`, then this function behaves
cooperatively. If not, it *may* have race conditions; see
:func:`fork_gevent` for more information.
The arguments are as for the underlying
:func:`os.waitpid`. Some combinations of *options* may not
be supported cooperatively (as of 1.1 that includes
WUNTRACED). Using a *pid* of 0 to request waiting on only processes
from the current process group is not cooperative.
Availability: POSIX.
.. versionadded:: 1.1b1
.. versionchanged:: 1.2a1
More cases are handled in a cooperative manner.
"""
# pylint: disable=too-many-return-statements
# XXX Does not handle tracing children
# So long as libev's loop doesn't run, it's OK to add
# child watchers. The SIGCHLD handler only feeds events
# for the next iteration of the loop to handle. (And the
# signal handler itself is only called from the next loop
# iteration.)
if pid <= 0:
# magic functions for multiple children.
if pid == -1:
# Any child. If we have one that we're watching and that finished,
# we will use that one. Otherwise, let the OS take care of it.
for k, v in _watched_children.items():
if isinstance(v, tuple):
pid = k
break
if pid <= 0:
# We didn't have one that was ready. If there are
# no funky options set, and the pid was -1
# (meaning any process, not 0, which means process
# group--- libev doesn't know about process
# groups) then we can use a child watcher of pid 0; otherwise,
# pass through to the OS.
if pid == -1 and options == 0:
hub = get_hub()
with hub.loop.child(0, False) as watcher:
hub.wait(watcher)
return watcher.rpid, watcher.rstatus
# There were funky options/pid, so we must go to the OS.
return _waitpid(pid, options)
if pid in _watched_children:
# yes, we're watching it
# Note that the remainder of this code must be careful to NOT
# yield to the event loop except at well known times, or
# we have a race condition between the _on_child callback and the
# code here that could lead to a process to hang.
if options & _WNOHANG or isinstance(_watched_children[pid], tuple):
# We're either asked not to block, or it already finished, in which
# case blocking doesn't matter
result = _watched_children[pid]
if isinstance(result, tuple):
# it finished. libev child watchers
# are one-shot
del _watched_children[pid]
return result[:2]
# it's not finished
return (0, 0)
# Ok, we need to "block". Do so via a watcher so that we're
# cooperative. We know it's our child, etc, so this should work.
watcher = _watched_children[pid]
# We can't start a watcher that's already started,
# so we can't reuse the existing watcher. Notice that the
# old watcher must not have fired already, or during this time, but
# only after we successfully `start()` the watcher. So this must
# not yield to the event loop.
with watcher.loop.child(pid, False) as new_watcher:
get_hub().wait(new_watcher)
# Ok, so now the new watcher is done. That means
# the old watcher's callback (_on_child) should
# have fired, potentially taking this child out of
# _watched_children (but that could depend on how
# many callbacks there were to run, so use the
# watcher object directly; libev sets all the
# watchers at the same time).
return watcher.rpid, watcher.rstatus
# we're not watching it and it may not even be our child,
# so we must go to the OS to be sure to get the right semantics (exception)
# XXX
# libuv has a race condition because the signal
# handler is a Python function, so the InterruptedError
# is raised before the signal handler runs and calls the
# child watcher
# we're not watching it
return _waitpid(pid, options)
def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent):
"""
Fork a child process and start a child watcher for it in the parent process.
This call cooperates with :func:`waitpid` to enable cooperatively waiting
for children to finish. When monkey-patching, these functions are patched in as
:func:`os.fork` and :func:`os.waitpid`, respectively.
In the child process, this function calls :func:`gevent.hub.reinit` before returning.
Availability: POSIX.
:keyword callback: If given, a callable that will be called with the child watcher
when the child finishes.
:keyword loop: The loop to start the watcher in. Defaults to the
loop of the current hub.
:keyword fork: The fork function. Defaults to :func:`the one defined in this
module <gevent.os.fork_gevent>` (which automatically calls :func:`gevent.hub.reinit`).
Pass the builtin :func:`os.fork` function if you do not need to
initialize gevent in the child process.
.. versionadded:: 1.1b1
.. seealso::
:func:`gevent.monkey.get_original` To access the builtin :func:`os.fork`.
"""
pid = fork()
if pid:
# parent
loop = loop or get_hub().loop
watcher = loop.child(pid, ref=ref)
_watched_children[pid] = watcher
watcher.start(_on_child, watcher, callback)
return pid
__extensions__.append('fork_and_watch')
__extensions__.append('fork_gevent')
if 'forkpty' in __implements__:
def forkpty_and_watch(callback=None, loop=None, ref=False, forkpty=forkpty_gevent):
"""
Like :func:`fork_and_watch`, except using :func:`forkpty_gevent`.
Availability: Some Unix systems.
.. versionadded:: 1.1b5
"""
result = []
def _fork():
pid_and_fd = forkpty()
result.append(pid_and_fd)
return pid_and_fd[0]
fork_and_watch(callback, loop, ref, _fork)
return result[0]
__extensions__.append('forkpty_and_watch')
# Watch children by default
if not config.disable_watch_children:
# Broken out into separate functions instead of simple name aliases
# for documentation purposes.
def fork(*args, **kwargs):
"""
Forks a child process and starts a child watcher for it in the
parent process so that ``waitpid`` and SIGCHLD work as expected.
This implementation of ``fork`` is a wrapper for :func:`fork_and_watch`
when the environment variable ``GEVENT_NOWAITPID`` is *not* defined.
This is the default and should be used by most applications.
.. versionchanged:: 1.1b2
"""
# take any args to match fork_and_watch
return fork_and_watch(*args, **kwargs)
if 'forkpty' in __implements__:
def forkpty(*args, **kwargs):
"""
Like :func:`fork`, but using :func:`forkpty_gevent`.
This implementation of ``forkpty`` is a wrapper for :func:`forkpty_and_watch`
when the environment variable ``GEVENT_NOWAITPID`` is *not* defined.
This is the default and should be used by most applications.
.. versionadded:: 1.1b5
"""
# take any args to match fork_and_watch
return forkpty_and_watch(*args, **kwargs)
__implements__.append("waitpid")
else:
def fork():
"""
Forks a child process, initializes gevent in the child,
but *does not* prepare the parent to wait for the child or receive SIGCHLD.
This implementation of ``fork`` is a wrapper for :func:`fork_gevent`
when the environment variable ``GEVENT_NOWAITPID`` *is* defined.
This is not recommended for most applications.
"""
return fork_gevent()
if 'forkpty' in __implements__:
def forkpty():
"""
Like :func:`fork`, but using :func:`os.forkpty`
This implementation of ``forkpty`` is a wrapper for :func:`forkpty_gevent`
when the environment variable ``GEVENT_NOWAITPID`` *is* defined.
This is not recommended for most applications.
.. versionadded:: 1.1b5
"""
return forkpty_gevent()
__extensions__.append("waitpid")
else:
__implements__.remove('fork')
__imports__ = copy_globals(os, globals(),
names_to_ignore=__implements__ + __extensions__,
dunder_names_to_keep=())
__all__ = list(set(__implements__ + __extensions__))