Рассмотрим следующую unittest.TestCase
, которая реализует две версии одного и того же теста, с той лишь разницей, что один выполняет subTest
с параллельно, используя multiprocessing
.
import multiprocessing as mp
from unittest import TestCase
class TestBehaviour(TestCase):
def _test_equals(self, val):
for target_val in [1, 2]:
with self.subTest(target=target_val, source=val):
self.assertEqual(val, target_val)
def test_equality_parallel(self):
with mp.Pool(processes=4) as pool:
pool.map(self._test_equals, [1, 2])
pool.join()
pool.close()
def test_equality(self):
for val in [1, 2]:
self._test_equals(val)
Последовательная версия , test_equality
, работает как положено и выдает следующие тестовые сбои:
======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=2, source=1)
----------------------------------------------------------------------
Traceback (most recent call last):
File "temp.py", line 11, in _test_equals
self.assertEqual(val, target_val)
AssertionError: 1 != 2
======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=1, source=2)
----------------------------------------------------------------------
Traceback (most recent call last):
File "temp.py", line 11, in _test_equals
self.assertEqual(val, target_val)
AssertionError: 2 != 1
С другой стороны, test_equality_parallel
вызывает ошибку, поскольку TestCase
не может быть выбран:
Traceback (most recent call last):
File "temp.py", line 15, in test_equality_parallel
pool.map(self._test_equals, [1, 2])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 768, in get
raise self._value
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
put(task)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
Теперь я знаю, что могу выделить _test_equals
как отдельную функцию вне класса; тем не менее, я хотел бы сохранить поведение subTest
для обеспечения лучшей регистрации (и последующей отладки) сбоев тестов.
Как я могу запускать тесты параллельно, но сохранить эту функциональность subTest
?
Обновление
Я также пытался это сделать, используя pathos.multiprocessing.ProcessingPool
, чтобы обойти проблемы с сериализацией TestCase
; однако в этом случае pool.join()
поднимает ValueError: Pool is still running
.
from pathos.multiprocessing import ProcessingPool
...
def test_equality_parallel(self):
pool = ProcessingPool(processes=4)
pool.map(self._test_equals, [1, 2])
pool.join()
Обновление 2
Этот вопрос определенно актуален. Первое предложенное решение, создание второго класса для методов, вызываемых из дочернего процесса, не подходит, так как не позволяет использовать subTest
. Во-вторых, удаление непригодного для выбора объекта _Outcome
из TestCase
выглядит странным и, учитывая, что дочерние процессы работают subTests
, также выглядит непригодным.