Я использую многопоточный дизайн (не было выбора), но большая часть моего кода находится в одном потоке, где все события в нем управляются через очередь . Таким образом, большая часть моего кода ведет себя так, как если бы он был однопоточным, и мне не нужно беспокоиться о блокировках, семафорах и прочем.
Увы, я подошел к тому моменту, когда мне нужно протестировать мой код (во-первых, не пытайтесь не использовать TDDing), и я в растерянности - как вы тестируете что-то в другом потоке?
Например, скажем, у меня есть следующий класс:
class MyClass():
def __init__(self):
self.a=0
# register event to self.on_event
def on_some_event(self, b):
self.a += b
def get(self):
return self.a
и я хочу проверить:
import unittest
from queued_thread import ThreadedQueueHandler
class TestMyClass(unittest.TestCase):
def setUp(self):
# create the queued thread and assign the queue to self.queue
def test_MyClass(self):
mc = MyClass()
self.queue.put({'event_name':'some_event', 'val':1})
self.queue.put({'event_name':'some_event', 'val':2})
self.queue.put({'event_name':'some_event', 'val':3})
self.assertEqual(mc.get(),6)
if __name__ == '__main__':
unittest.main()
MyClass.get()
отлично работает для всего, находящегося в очереди, но тест будет вызываться асинхронно в главном потоке, поэтому результат может быть неверным!