# Copyright 2009 Brian Quinlan. All Rights Reserved. # Licensed to PSF under a Contributor Agreement. """Implements ThreadPoolExecutor.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' from concurrent.futures import _base import itertools import queue import threading import types import weakref import os _threads_queues = weakref.WeakKeyDictionary() _shutdown = False # Lock that ensures that new workers are not created while the interpreter is # shutting down. Must be held while mutating _threads_queues and _shutdown. _global_shutdown_lock = threading.Lock() def _python_exit(): global _shutdown with _global_shutdown_lock: _shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) for t, q in items: t.join() # Register for `_python_exit()` to be called just before joining all # non-daemon threads. This is used instead of `atexit.register()` for # compatibility with subinterpreters, which no longer support daemon threads. # See bpo-39812 for context. threading._register_atexit(_python_exit) # At fork, reinitialize the `_global_shutdown_lock` lock in the child process if hasattr(os, 'register_at_fork'): os.register_at_fork(before=_global_shutdown_lock.acquire, after_in_child=_global_shutdown_lock._at_fork_reinit, after_in_parent=_global_shutdown_lock.release) class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn self.args = args self.kwargs = kwargs def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as exc: self.future.set_exception(exc) # Break a reference cycle with the exception 'exc' self = None else: self.future.set_result(result) __class_getitem__ = classmethod(types.GenericAlias) def _worker(executor_reference, work_queue, initializer, initargs): if initializer is not None: try: initializer(*initargs) except BaseException: _base.LOGGER.critical('Exception in initializer:', exc_info=True) executor = executor_reference() if executor is not None: executor._initializer_failed() return try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item # attempt to increment idle count executor = executor_reference() if executor is not None: executor._idle_semaphore.release() del executor continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Flag the executor as shutting down as early as possible if it # is not gc-ed yet. if executor is not None: executor._shutdown = True # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) class BrokenThreadPool(_base.BrokenExecutor): """ Raised when a worker thread in a ThreadPoolExecutor failed initializing. """ class ThreadPoolExecutor(_base.Executor): # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer. """ if max_workers is None: # ThreadPoolExecutor is often used to: # * CPU bound task which releases GIL # * I/O bound task (which releases GIL, of course) # # We use cpu_count + 4 for both types of tasks. # But we limit it to 32 to avoid consuming surprisingly large resource # on many core machine. max_workers = min(32, (os.cpu_count() or 1) + 4) if max_workers <= 0: raise ValueError("max_workers must be greater than 0") if initializer is not None and not callable(initializer): raise TypeError("initializer must be a callable") self._max_workers = max_workers self._work_queue = queue.SimpleQueue() self._idle_semaphore = threading.Semaphore(0) self._threads = set() self._broken = False self._shutdown = False self._shutdown_lock = threading.Lock() self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) self._initializer = initializer self._initargs = initargs def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock, _global_shutdown_lock: if self._broken: raise BrokenThreadPool(self._broken) if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') if _shutdown: raise RuntimeError('cannot schedule new futures after ' 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): # if idle threads are available, don't spin new threads if self._idle_semaphore.acquire(timeout=0): return # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue, self._initializer, self._initargs)) t.start() self._threads.add(t) _threads_queues[t] = self._work_queue def _initializer_failed(self): with self._shutdown_lock: self._broken = ('A thread initializer failed, the thread pool ' 'is not usable anymore') # Drain work queue and mark pending futures failed while True: try: work_item = self._work_queue.get_nowait() except queue.Empty: break if work_item is not None: work_item.future.set_exception(BrokenThreadPool(self._broken)) def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: self._shutdown = True if cancel_futures: # Drain all work items from the queue, and then cancel their # associated futures. while True: try: work_item = self._work_queue.get_nowait() except queue.Empty: break if work_item is not None: work_item.future.cancel() # Send a wake-up to prevent threads calling # _work_queue.get(block=True) from permanently blocking. self._work_queue.put(None) if wait: for t in self._threads: t.join() shutdown.__doc__ = _base.Executor.shutdown.__doc__