查看原文
其他

python 实现超时退出

carry麦 码农真经 2023-12-25


使用场景



我今天使用python去创建命名管道,然后用open函数打开命名管道,如果一个进程去用'r'模式打开命名管道,它就会阻塞,直到有个进程用'w'模式打开同一个命名管道。
但是由于这里有两个进程,并不能保证,两个进程一定都成功运行,因此就要引入一个超时的办法,超时后自动失败。
而open函数并没有提供超时的功能,因此需要自己写一个。

代码实现




# FORK FROM: https://github.com/glenfant/stopit
__all__ = ['TimeoutException', 'TimeoutContextState', 'BaseTimeout', 'SignalTimeout', 'ThreadingTimeout']
import ctypesimport enumimport functoolsimport signalimport sysimport threadingimport warnings
from abc import abstractmethod, ABCfrom typing import Union

class TimeoutException(Exception): """Raised when the block under context management takes longer to complete than the allowed maximum timeout value. """ pass

class TimeoutContextState(enum.Enum): INITIALIZED: int = 0 EXECUTING: int = 1 TIMED_OUT: int = 2 INTERRUPTED: int = 3 CANCELED: int = 4

class BaseTimeout(ABC): """Context manager for limiting in the time the execution of a block :param seconds: ``float`` or ``int`` duration enabled to run the context manager block :param swallow_exc: ``False`` if you want to manage the ``TimeoutException`` (or any other) in an outer ``try ... except`` structure. ``True`` (default) if you just want to check the execution of the block with the ``state`` attribute of the context manager. """ state: TimeoutContextState
def __init__(self, seconds: Union[int, float], suppress_exc: bool = False): self.seconds = seconds self.suppress_exc = suppress_exc self.state = TimeoutContextState.INITIALIZED
def __bool__(self) -> bool: return self.state in (TimeoutContextState.INITIALIZED, TimeoutContextState.EXECUTING)
def __repr__(self) -> str: return '<%s object at 0x%x in state: %r>' % ( type(self).__qualname__, id(self), self.state)
def __enter__(self): self.state = TimeoutContextState.EXECUTING self.setup_interrupt() return self
def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is TimeoutException: if self.state is not TimeoutContextState.TIMED_OUT: self.state = TimeoutContextState.INTERRUPTED self.suppress_interrupt() return self.suppress_exc else: if exc_type is None: self.state = TimeoutContextState.INITIALIZED self.suppress_interrupt()
def cancel(self): "In case in the block you realize you don't need anymore limitation" self.state = TimeoutContextState.CANCELED self.suppress_interrupt()
@abstractmethod def setup_interrupt(self): "Installs/initializes the feature that interrupts the executed block" raise NotImplementedError
@abstractmethod def suppress_interrupt(self): "Removes/neutralizes the feature that interrupts the executed block" raise NotImplementedError

class SignalTimeout(BaseTimeout): """Context manager for limiting in the time the execution of a block using signal.SIGALRM Unix signal. See :class:`stopit.utils.BaseTimeout` for more information """ def __init__(self, seconds: Union[int, float], suppress_exc: bool = False): trunc_seconds = int(seconds) if trunc_seconds != seconds: warnings.warn('NOTE: alarm delay for signal MUST be int, ' '`seconds` has been truncated automatically') super().__init__(trunc_seconds, suppress_exc)
def handle_timeout(self, signum, frame): self.state = TimeoutContextState.TIMED_OUT raise TimeoutException('Block exceeded maximum timeout ' 'value (%d seconds).' % self.seconds)
def setup_interrupt(self): signal.signal(signal.SIGALRM, self.handle_timeout) signal.alarm(self.seconds)
def suppress_interrupt(self): signal.alarm(0) signal.signal(signal.SIGALRM, signal.SIG_DFL)

def async_raise(target_tid, exception): """Raises an asynchronous exception in another thread. Read http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc for further enlightenments. :param target_tid: target thread identifier :param exception: Exception class to be raised in that thread """ # Ensuring and releasing GIL are useless since we're not in C # gil_state = ctypes.pythonapi.PyGILState_Ensure() ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(target_tid), ctypes.py_object(exception)) # ctypes.pythonapi.PyGILState_Release(gil_state) if ret == 0: raise ValueError("Invalid thread ID {}".format(target_tid)) elif ret > 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(target_tid), None) raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadingTimeout(BaseTimeout): """Context manager for limiting in the time the execution of a block using asynchronous threads launching exception. See :class:`stopit.utils.BaseTimeout` for more information """ def __init__(self, seconds: Union[int, float], suppress_exc: bool = False): super().__init__(seconds, suppress_exc) self.target_tid = threading.current_thread().ident self.timer = None # PEP8
def stop(self): """Called by timer thread at timeout. Raises a Timeout exception in the caller thread """ self.state = TimeoutContextState.TIMED_OUT async_raise(self.target_tid, TimeoutException)
# Required overrides def setup_interrupt(self): """Setting up the resource that interrupts the block """ self.timer = threading.Timer(self.seconds, self.stop) self.timer.start()
def suppress_interrupt(self): """Removing the resource that interrupts the block """ self.timer.cancel()

用例




往期推荐

《Python数据科学入门》

python实用脚本函数:邮件发送

《Python金融大数据分析 第2版》

流畅的python

爱上Python 一日精通Python编程

继续滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存