72 lines
2.3 KiB
72 lines
2.3 KiB
6 years ago
|
import signal
|
||
|
import weakref
|
||
|
|
||
|
from functools import wraps
|
||
|
|
||
|
__unittest = True
|
||
|
|
||
|
|
||
|
class _InterruptHandler(object):
|
||
|
def __init__(self, default_handler):
|
||
|
self.called = False
|
||
|
self.original_handler = default_handler
|
||
|
if isinstance(default_handler, int):
|
||
|
if default_handler == signal.SIG_DFL:
|
||
|
# Pretend it's signal.default_int_handler instead.
|
||
|
default_handler = signal.default_int_handler
|
||
|
elif default_handler == signal.SIG_IGN:
|
||
|
# Not quite the same thing as SIG_IGN, but the closest we
|
||
|
# can make it: do nothing.
|
||
|
def default_handler(unused_signum, unused_frame):
|
||
|
pass
|
||
|
else:
|
||
|
raise TypeError("expected SIGINT signal handler to be "
|
||
|
"signal.SIG_IGN, signal.SIG_DFL, or a "
|
||
|
"callable object")
|
||
|
self.default_handler = default_handler
|
||
|
|
||
|
def __call__(self, signum, frame):
|
||
|
installed_handler = signal.getsignal(signal.SIGINT)
|
||
|
if installed_handler is not self:
|
||
|
# if we aren't the installed handler, then delegate immediately
|
||
|
# to the default handler
|
||
|
self.default_handler(signum, frame)
|
||
|
|
||
|
if self.called:
|
||
|
self.default_handler(signum, frame)
|
||
|
self.called = True
|
||
|
for result in _results.keys():
|
||
|
result.stop()
|
||
|
|
||
|
_results = weakref.WeakKeyDictionary()
|
||
|
def registerResult(result):
|
||
|
_results[result] = 1
|
||
|
|
||
|
def removeResult(result):
|
||
|
return bool(_results.pop(result, None))
|
||
|
|
||
|
_interrupt_handler = None
|
||
|
def installHandler():
|
||
|
global _interrupt_handler
|
||
|
if _interrupt_handler is None:
|
||
|
default_handler = signal.getsignal(signal.SIGINT)
|
||
|
_interrupt_handler = _InterruptHandler(default_handler)
|
||
|
signal.signal(signal.SIGINT, _interrupt_handler)
|
||
|
|
||
|
|
||
|
def removeHandler(method=None):
|
||
|
if method is not None:
|
||
|
@wraps(method)
|
||
|
def inner(*args, **kwargs):
|
||
|
initial = signal.getsignal(signal.SIGINT)
|
||
|
removeHandler()
|
||
|
try:
|
||
|
return method(*args, **kwargs)
|
||
|
finally:
|
||
|
signal.signal(signal.SIGINT, initial)
|
||
|
return inner
|
||
|
|
||
|
global _interrupt_handler
|
||
|
if _interrupt_handler is not None:
|
||
|
signal.signal(signal.SIGINT, _interrupt_handler.original_handler)
|