Я работаю над подклассом threading.Thread
, который позволяет вызывать и запускать его методы в потоке, представленном объектом, к которому они обращаются, в отличие от обычного поведения. Я делаю это, используя декораторы в целевом методе, которые помещают вызов метода в collections.deque
, и используя метод run
для обработки deque.
метод run
использует оператор while not self.__stop:
и объект threading.Condition
, чтобы дождаться вызова в очереди и затем вызвать self.__process_calls
. Часть else
цикла while
делает последний вызов __process_calls
. если self.__stop
, исключение возникает при любых попытках вызвать один из 'вызываемых' методов из другого потока.
Проблема в том, что __process_calls
не может быть возвращен, если последнее утверждение не является print
, которое я обнаружил во время отладки. Я пробовал a = 1
и явный return
, но ни одна из них не работает. с любым оператором print
как последним оператором функции, он возвращается, и поток не зависает. Есть идеи, что происходит?
РЕДАКТИРОВАТЬ: Давид Заславский указал, что печать работает, потому что это занимает некоторое время
и я подтвердил, что
Код немного длинный, но, надеюсь, мое объяснение выше достаточно ясно, чтобы помочь понять его.
import threading
import collections
class BrokenPromise(Exception): pass
class CallableThreadError(Exception): pass
class CallToNonRunningThreadError(CallableThreadError): pass
class Promise(object):
def __init__(self, deque, condition):
self._condition = condition
self._deque = deque
def read(self, timeout=None):
if not self._deque:
with self._condition:
if timeout:
self._condition.wait(timeout)
else:
self._condition.wait()
if self._deque:
value = self._deque.popleft()
del self._deque
del self._condition
return value
else:
raise BrokenPromise
def ready(self):
return bool(self._deque)
class CallableThread(threading.Thread):
def __init__(self, *args, **kwargs):
# _enqueued_calls is used to store tuples that encode a function call.
# It is processed by the run method
self.__enqueued_calls = collections.deque()
# _enqueue_call_permission is for callers to signal that they have
# placed something on the queue
self.__enqueue_call_permission = threading.Condition()
self.__stop = False
super(CallableThread, self).__init__(*args, **kwargs)
@staticmethod
def blocking_method(f):
u"""A decorator function to implement a blocking method on a thread"""
# the returned function enqueues the decorated function and blocks
# until the decorated function# is called and returns. It then returns
# the value unmodified. The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on
f = CallableThread.nonblocking_method_with_promise(f)
def register(self, *args, **kwargs):
p = f(self, *args, **kwargs)
return p.read()
return register
@staticmethod
def nonblocking_method_with_promise(f):
u"""A decorator function to implement a non-blocking method on a
thread
"""
# the returned function enqueues the decorated function and returns a
# Promise object.N The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on.
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response_deque = collections.deque()
self.__push_call(f, args, kwargs, response_deque, call_complete)
return Promise(response_deque, call_complete)
return register
@staticmethod
def nonblocking_method(f):
def register(self, *args, **kwargs):
self.__push_call(f, args, kwargs)
return register
def run(self):
while not self.__stop: # while we've not been killed
with self.__enqueue_call_permission:
# get the condition so that we can wait on it if we need too.
if not self.__enqueued_calls:
self.__enqueue_call_permission.wait()
self.__process_calls()
else:
# if we exit because self._run == False, finish processing
# the pending calls if there are any
self.__process_calls()
def stop(self):
u""" Signal the thread to stop"""
with self.__enqueue_call_permission:
# we do this in case the run method is stuck waiting on an update
self.__stop = True
self.__enqueue_call_permission.notify()
def __process_calls(self):
print "processing calls"
while self.__enqueued_calls:
((f, args, kwargs),
response_deque, call_complete) = self.__enqueued_calls.popleft()
if call_complete:
with call_complete:
response_deque.append(f(self, *args, **kwargs))
call_complete.notify()
else:
f(self, *args, **kwargs)
# this is where you place the print statement if you want to see the
# behavior
def __push_call(self, f, args, kwargs, response_deque=None,
call_complete=None):
if self.__stop:
raise CallToNonRunningThreadError(
"This thread is no longer accepting calls")
with self.__enqueue_call_permission:
self.__enqueued_calls.append(((f, args, kwargs),
response_deque, call_complete))
self.__enqueue_call_permission.notify()
#if __name__=='__main__': i lost the indent on the following code in copying but
#it doesn't matter in this context
class TestThread(CallableThread):
u"""Increment a counter on each call and print the value"""
counter = 0
@CallableThread.nonblocking_method_with_promise
def increment(self):
self.counter += 1
return self.counter
class LogThread(CallableThread):
@CallableThread.nonblocking_method
def log(self, message):
print message
l = LogThread()
l.start()
l.log("logger started")
t = TestThread()
t.start()
l.log("test thread started")
p = t.increment()
l.log("promise aquired")
v = p.read()
l.log("promise read")
l.log("{0} read from promise".format(v))
l.stop()
t.stop()
l.join()
t.join()