Как выполнить python подстесты параллельно? - PullRequest
1 голос
/ 12 марта 2020

Рассмотрим следующую unittest.TestCase, которая реализует две версии одного и того же теста, с той лишь разницей, что один выполняет subTest с параллельно, используя multiprocessing.

import multiprocessing as mp
from unittest import TestCase


class TestBehaviour(TestCase):
    def _test_equals(self, val):
        for target_val in [1, 2]:
            with self.subTest(target=target_val, source=val):
                self.assertEqual(val, target_val)

    def test_equality_parallel(self):
        with mp.Pool(processes=4) as pool:
            pool.map(self._test_equals, [1, 2])
            pool.join()
            pool.close()

    def test_equality(self):
        for val in [1, 2]:
            self._test_equals(val)

Последовательная версия , test_equality, работает как положено и выдает следующие тестовые сбои:

======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=2, source=1)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "temp.py", line 11, in _test_equals
    self.assertEqual(val, target_val)
AssertionError: 1 != 2

======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=1, source=2)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "temp.py", line 11, in _test_equals
    self.assertEqual(val, target_val)
AssertionError: 2 != 1

С другой стороны, test_equality_parallel вызывает ошибку, поскольку TestCase не может быть выбран:

Traceback (most recent call last):
  File "temp.py", line 15, in test_equality_parallel
    pool.map(self._test_equals, [1, 2])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 768, in get
    raise self._value
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object

Теперь я знаю, что могу выделить _test_equals как отдельную функцию вне класса; тем не менее, я хотел бы сохранить поведение subTest для обеспечения лучшей регистрации (и последующей отладки) сбоев тестов.

Как я могу запускать тесты параллельно, но сохранить эту функциональность subTest?

Обновление

Я также пытался это сделать, используя pathos.multiprocessing.ProcessingPool, чтобы обойти проблемы с сериализацией TestCase; однако в этом случае pool.join() поднимает ValueError: Pool is still running.

from pathos.multiprocessing import ProcessingPool
...
    def test_equality_parallel(self):                                           
        pool = ProcessingPool(processes=4)                                      
        pool.map(self._test_equals, [1, 2])
        pool.join()

Обновление 2

Этот вопрос определенно актуален. Первое предложенное решение, создание второго класса для методов, вызываемых из дочернего процесса, не подходит, так как не позволяет использовать subTest. Во-вторых, удаление непригодного для выбора объекта _Outcome из TestCase выглядит странным и, учитывая, что дочерние процессы работают subTests, также выглядит непригодным.

1 Ответ

1 голос
/ 15 марта 2020

Я автор pathosdill и multiprocess). Вы все еще видите ошибку сериализации из pathos между процессами. Вы можете попробовать сериализацию между потоками. Параллельная нить, вероятно, подходит для этого уровня функции.

import multiprocess.dummy as mp
from unittest import TestCase


class TestBehaviour(TestCase):
    def _test_equals(self, val):
        for target_val in [1, 2]:
            with self.subTest(target=target_val, source=val):
                self.assertEqual(val, target_val)

    def test_equality_parallel(self):
        with mp.Pool(processes=4) as pool:
            pool.map(self._test_equals, [1, 2])
            pool.join()
            pool.close()

    def test_equality(self):
        for val in [1, 2]:
            self._test_equals(val)

Что дает:

======================================================================
FAIL: test_equality (test_equaltiy.TestBehaviour)
----------------------------------------------------------------------
...[snip]...
AssertionError: 1 != 2

======================================================================
FAIL: test_equality_parallel (test_equaltiy.TestBehaviour)
----------------------------------------------------------------------
...[snip]...
AssertionError: 1 != 2

----------------------------------------------------------------------
Ran 2 tests in 0.108s

FAILED (failures=2)

Это говорит о том, что вы можете использовать вариант сериализации из dill (то есть в dill.settings), чтобы обойти сериализацию вопрос. См .: https://github.com/uqfoundation/multiprocess/issues/48.

...