Ошибка при использовании pexpect и многопроцессорности? ошибка «Ошибка типа: невозможно сериализовать объект« _io.TextIOWrapper »» - PullRequest
0 голосов
/ 08 ноября 2019

У меня есть скрипт Python 3.7 на компьютере с Linux, где я пытаюсь запустить функцию в нескольких потоках, но при попытке получить следующую ошибку:

Traceback (most recent call last):
  File "./test2.py", line 43, in <module>
    pt.ping_scanx()
  File "./test2.py", line 39, in ping_scanx
    par = Parallel(function=self.pingx, parameter_list=list, thread_limit=10)
  File "./test2.py", line 19, in __init__
    self._x = self._pool.starmap(function, parameter_list, chunksize=1)
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 276, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 657, in get
    raise self._value
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
    put(task)
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object

Это пример кода, которыйЯ использую, чтобы продемонстрировать проблему:

#!/usr/local/bin/python3.7
from multiprocessing import Pool
import pexpect   # Used to run SSH for sessions

class Parallel:

    def __init__(self, function, parameter_list, thread_limit=4):

        # Create new thread to hold our jobs
        self._pool = Pool(processes=thread_limit)

        self._x = self._pool.starmap(function, parameter_list, chunksize=1)

class PingTest():

    def __init__(self):
        self._pex = None

    def connect(self):
        self._pex = pexpect.spawn("ssh snorton@127.0.0.1")

    def pingx(self, target_ip, source_ip):
        print("PING {} {}".format(target_ip, source_ip))

    def ping_scanx(self):

        self.connect()

        list = [['8.8.8.8', '96.53.16.93'],
                ['8.8.8.8', '96.53.16.93']]

        par = Parallel(function=self.pingx, parameter_list=list, thread_limit=10)


pt = PingTest()
pt.ping_scanx()

Если я не включу строку с pexpect.spawn, ошибка не произойдет. Может кто-нибудь объяснить, почему я получаю ошибку, и предложить способ ее исправить?

1 Ответ

0 голосов
/ 08 ноября 2019

С multiprocessing.Pool вы фактически вызываете функцию как отдельные процессы, а не как потоки. Процессы не могут совместно использовать объекты Python до тех пор, пока они не будут сначала сериализованы, прежде чем передавать их друг другу по каналам связи между процессами, что и делает multiprocessing.Pool для вас за сценой, используя pickle в качестве сериализатора. Поскольку pexpect.spawn открывает оконечное устройство как файлоподобный объект TextIOWrapper, и вы сохраняете возвращаемый объект в экземпляре PingTest, а затем передаете связанный метод self.pingx в Pool.starmap, он попытаетсяserialize self, который содержит объект pexpect.spawn в атрибуте _pex, который, к сожалению, не может быть сериализован, поскольку TextIOWrapper не поддерживает сериализацию.

Поскольку ваша функция связана с вводом / выводом, вам следуетвместо этого используйте многопоточность через модуль multiprocessing.dummy для более эффективного распараллеливания и, что более важно, в этом случае, чтобы разрешить общий доступ к объекту pexpect.spawn между потоками без необходимости сериализации.

Изменение:

from multiprocessing import Pool

на:

from multiprocessing.dummy import Pool

Демонстрация: https://repl.it/@blhsing/WiseYoungExperiments

...