159 lines
5.2 KiB

#
# We use a background thread for sharing fds on Unix, and for sharing sockets on
# Windows.
#
# A client which wants to pickle a resource registers it with the resource
# sharer and gets an identifier in return. The unpickling process will connect
# to the resource sharer, sends the identifier and its pid, and then receives
# the resource.
#
import os
import signal
import socket
import sys
import threading
from . import process
from .context import reduction
from . import util
__all__ = ['stop']
if sys.platform == 'win32':
__all__ += ['DupSocket']
class DupSocket(object):
'''Picklable wrapper for a socket.'''
def __init__(self, sock):
new_sock = sock.dup()
def send(conn, pid):
share = new_sock.share(pid)
conn.send_bytes(share)
self._id = _resource_sharer.register(send, new_sock.close)
def detach(self):
'''Get the socket. This should only be called once.'''
with _resource_sharer.get_connection(self._id) as conn:
share = conn.recv_bytes()
return socket.fromshare(share)
else:
__all__ += ['DupFd']
class DupFd(object):
'''Wrapper for fd which can be used at any time.'''
def __init__(self, fd):
new_fd = os.dup(fd)
def send(conn, pid):
reduction.send_handle(conn, new_fd, pid)
def close():
os.close(new_fd)
self._id = _resource_sharer.register(send, close)
def detach(self):
'''Get the fd. This should only be called once.'''
with _resource_sharer.get_connection(self._id) as conn:
return reduction.recv_handle(conn)
class _ResourceSharer(object):
'''Manager for resouces using background thread.'''
def __init__(self):
self._key = 0
self._cache = {}
self._old_locks = []
self._lock = threading.Lock()
self._listener = None
self._address = None
self._thread = None
util.register_after_fork(self, _ResourceSharer._afterfork)
def register(self, send, close):
'''Register resource, returning an identifier.'''
with self._lock:
if self._address is None:
self._start()
self._key += 1
self._cache[self._key] = (send, close)
return (self._address, self._key)
@staticmethod
def get_connection(ident):
'''Return connection from which to receive identified resource.'''
from .connection import Client
address, key = ident
c = Client(address, authkey=process.current_process().authkey)
c.send((key, os.getpid()))
return c
def stop(self, timeout=None):
'''Stop the background thread and clear registered resources.'''
from .connection import Client
with self._lock:
if self._address is not None:
c = Client(self._address,
authkey=process.current_process().authkey)
c.send(None)
c.close()
self._thread.join(timeout)
if self._thread.is_alive():
util.sub_warning('_ResourceSharer thread did '
'not stop when asked')
self._listener.close()
self._thread = None
self._address = None
self._listener = None
for key, (send, close) in self._cache.items():
close()
self._cache.clear()
def _afterfork(self):
for key, (send, close) in self._cache.items():
close()
self._cache.clear()
# If self._lock was locked at the time of the fork, it may be broken
# -- see issue 6721. Replace it without letting it be gc'ed.
self._old_locks.append(self._lock)
self._lock = threading.Lock()
if self._listener is not None:
self._listener.close()
self._listener = None
self._address = None
self._thread = None
def _start(self):
from .connection import Listener
assert self._listener is None, "Already have Listener"
util.debug('starting listener and thread for sending handles')
self._listener = Listener(authkey=process.current_process().authkey)
self._address = self._listener.address
t = threading.Thread(target=self._serve)
t.daemon = True
t.start()
self._thread = t
def _serve(self):
if hasattr(signal, 'pthread_sigmask'):
signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
while 1:
try:
with self._listener.accept() as conn:
msg = conn.recv()
if msg is None:
break
key, destination_pid = msg
send, close = self._cache.pop(key)
try:
send(conn, destination_pid)
finally:
close()
except:
if not util.is_exiting():
sys.excepthook(*sys.exc_info())
_resource_sharer = _ResourceSharer()
stop = _resource_sharer.stop