Source code for hbp_nrp_commons.killable_threads

This module contains a killable Python thread implementation
import threading
import inspect
import ctypes

__author__ = "Tomer Filiba, Georg Hinkel"

def _async_raise(tid, exctype):
    """raises the exception, performs cleanup if needed"""
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    if res != 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

[docs]class Thread(threading.Thread): """ This class represents a thread that can be terminated """ def __init__(self, *args, **kwargs): super(Thread, self).__init__(*args, **kwargs) self.__thread_id = None def _get_my_tid(self): """determines this (self's) thread id""" if not self.isAlive(): raise threading.ThreadError("the thread is not active") # do we have it cached? if self.__thread_id is not None: return self.__thread_id # no, look for it in the _active dict # pylint: disable=protected-access for tid, tobj in list(threading._active.items()): if tobj is self: self.__thread_id = tid return tid raise AssertionError("could not determine the thread's id")
[docs] def raise_exc(self, exctype): """raises the given exception type in the context of this thread""" _async_raise(self._get_my_tid(), exctype)
[docs] def terminate(self): """raises SystemExit in the context of the given thread, which should cause the thread to exit silently (unless caught)""" self.raise_exc(SystemExit)