Процесс Пул ненадежен, он либо зависает, либо работает нормально - PullRequest
0 голосов
/ 26 марта 2020

Здравствуйте и спасибо за вашу помощь :)

Программа, которую я писал, в основном разбирает исполняемые файлы. Я просто хотел посмотреть, смогу ли я сделать это быстрее, используя pathos. Проблема в том, что он не работает надежно. Через секунду я объясню, что я имею в виду под надежным .

. Программа запускается так:

from ControlFlow import Disassembler, DisassemblerWorker
from ControlFlow import FlowGraph
import networkx as nx
import time

file_path = "/vagrant/SimpleTestBinaries/example3-x64"
start = time.time()
flow = Disassembler(file_path)
graph = FlowGraph(flow)
end = time.time()

print("Finished in: ", end - start, " seconds")

Обычно она отвечает:

Finished in: 0.8992343274389473 seconds

Но иногда кажется, что он застрял. В конце концов, как вы можете видеть выше, это займет не более секунды. Поэтому я продолжаю убивать его, и он дает мне кучу ошибок, которые, возможно, намекают, где он застревает.

Process ForkPoolWorker-11:
Process ForkPoolWorker-13:
Process ForkPoolWorker-10:
Traceback (most recent call last):
Traceback (most recent call last):
Process ForkPoolWorker-12:
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
    task = get()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 337, in get
    with self._rlock:
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
    task = get()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/synchronize.py", line 101, in __enter__
    return self._semlock.__enter__()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
    task = get()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
    task = get()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 337, in get
    with self._rlock:
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/synchronize.py", line 101, in __enter__
    return self._semlock.__enter__()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 337, in get
    with self._rlock:
KeyboardInterrupt
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 338, in get
    res = self._reader.recv_bytes()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/synchronize.py", line 101, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/connection.py", line 219, in recv_bytes
    buf = self._recv_bytes(maxlength)
KeyboardInterrupt
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/connection.py", line 410, in _recv_bytes
    buf = self._recv(4)
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/connection.py", line 382, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
 in 
      6 file_path = "/vagrant/SimpleTestBinaries/example3-x64"
      7 start = time.time()
----> 8 flow = Disassembler(file_path)
      9 graph = FlowGraph(flow)
     10 end = time.time()

/vagrant/BinaryResearch/ControlFlow.py in __init__(self, path)
     34         self.regs_write_map = {}
     35         self.section_map = {}
---> 36         self._analyze_flow()
     37 
     38     def disassembler_setup(self, architecture, details=True):

/vagrant/BinaryResearch/ControlFlow.py in _analyze_flow(self)
     77             jumps = p.amap(worker.get_jump_map, imagebase)
     78             returns = p.amap(worker.get_return_map, imagebase)
---> 79             p.close(), p.join()
     80 
     81             call_results, jump_results, return_results = calls.get()[0], jumps.get()[0], returns.get()[0]

/usr/local/lib/python3.6/dist-packages/pathos/multiprocessing.py in join(self)
    206         _pool = __STATE.get(self._id, None)
    207         if _pool and self.__nodes == _pool.__nodes:
--> 208             _pool.join()
    209         return
    210     # interface

/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py in join(self)
    544         util.debug('joining pool')
    545         assert self._state in (CLOSE, TERMINATE)
--> 546         self._worker_handler.join()
    547         self._task_handler.join()
    548         self._result_handler.join()

/usr/lib/python3.6/threading.py in join(self, timeout)
   1054 
   1055         if timeout is None:
-> 1056             self._wait_for_tstate_lock()
   1057         else:
   1058             # the behavior of a negative timeout isn't documented, but

/usr/lib/python3.6/threading.py in _wait_for_tstate_lock(self, block, timeout)
   1070         if lock is None:  # already determined that the C code is done
   1071             assert self._is_stopped
-> 1072         elif lock.acquire(block, timeout):
   1073             lock.release()
   1074             self._stop()

KeyboardInterrupt: 

Итак, я пошел, чтобы проверить часть кода, на которую он ссылается. Я не знаю, означает ли это, что он застрял где-то между p.close() и p.join(). Это фрагмент внутри ControlFlow.py, на который он указывает.

from pathos.multiprocessing import ProcessPool

# More code ...

p = ProcessPool()

for section in available_sections:
    worker = DisassemblerWorker(self.architecture, section.content, section.virtual_address)

    p.clear()
    calls = p.amap(worker.get_call_map, imagebase)
    jumps = p.amap(worker.get_jump_map, imagebase)
    returns = p.amap(worker.get_return_map, imagebase)
    p.close(), p.join()

    call_results, jump_results, return_results = calls.get()[0], jumps.get()[0], returns.get()[0]

    # More code ...

Так что я действительно не знаю, что делает его ненадежным. Я знаю, это звучит безумно, но как только у программы первый «успех», она, кажется, работает нормально. Кроме того, я должен сказать, что я запускаю это в ноутбуке Jupyter. Я читал, что multiprocessing несовместим с ноутбуками, поэтому вместо него я использую multiprocess.

Есть идеи о том, что происходит?

Еще раз спасибо!

1 Ответ

0 голосов
/ 15 апреля 2020

У меня была та же проблема с базовой python многопроцессорной библиотекой (пулом): много раз работал, а иногда, кажется, зависал.

Документация указывает на необходимость иметь метод, который вы вызов с пулом вне вызова мультипроцессинг. https://docs.python.org/2/library/multiprocessing.html

На практике, если кажется, вам нужно определить свой метод вне if main. Я не знаю, почему поведение кажется непоследовательным, но следование руководству заставило его работать в некоторых случаях для меня.

Я нашел еще другие случаи, которые не работали, и в конечном итоге, кажется, написать методы вы распараллеливаете в отдельном файле. Я нашел этот последний бит в вопросе ниже, и это решило его для меня: Блокнот Jupyter никогда не заканчивает обработку с использованием многопроцессорной обработки (Python 3)

Ниже приведен пример использования вашей функции f в отдельном файле и вне основного.

**file 1: my_methods.py**
  def f(x):
  return x.count()



**file 2: main.py or your jupyter notebook, in the same directory here**

import multiprocessing as mp
from my_methods import f

def parallelize(df, func, n_cores=4):
    df_split = np.array_split(df, n_cores)
    pool = mp.Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df   

if __name__ == '__main__':
  output = parallelize(
    df=chosen_input_dataframe,
    func = f,
    n_cores=4,
    )
   
   print(output)
...