Зависание в многопроцессорной очереди - PullRequest
0 голосов
/ 21 февраля 2020

Я пытаюсь использовать multiprocessing.Queue, чтобы разделить файл на несколько файлов меньшего размера. Хотя приведенный ниже код работает часто, иногда он будет зависать:

Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/multiprocessing/util.py", line 265, in _run_finalizers
    finalizer()
  File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/multiprocessing/util.py", line 189, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/multiprocessing/queues.py", line 192, in _finalize_join
    thread.join()
  File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/home/pedroq/miniconda3/envs/drax_annot/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Я не уверен, почему? Данные, передаваемые процессам, могут быть довольно большими, может ли это быть проблемой? Вот псевдокод, который я использую:

def generate_split_processes_to_run(self,protein_seqs,seq_chunks):
    c=0
    for chunk in seq_chunks:
        self.queue.put([protein_seqs,chunk,c])
        c+=1

def sample_split_handler(self,protein_seqs,protein_seqs_groups,worker_count):
    #loading the queue
    self.generate_split_processes_to_run(protein_seqs,protein_seqs_groups)
    #spawning the processes
    processes = [Process(target=self.sample_split_worker, args=(self.queue,)) for _ in range(worker_count)]
    #starting the processes
    for process in processes:
        process.start()
    #joining processes
    for process in processes:
        process.join()
        print(processes)


def sample_split_worker(self, queue):
    while not queue.empty():
        seqs, chunk, chunk_number= queue.get()
        self.save_chunks(seqs, chunk, chunk_number)

def split_sample(self):
    seqs=self.read_file(self.target_path)
    seqs_keys=list(seqs.keys())
    worker_count= 7
    seq_chunks= chunk_generator(seqs_keys, 1000)
    self.sample_split_handler(seqs,seq_chunks,worker_count)

def save_chunks(self,seqs,
                      chunk,
                      chunk_number):
    with open(chunk_path, 'w+') as file:
        while chunk:
            seq_id = chunk.pop(0)
            chunk_str = 'something'
            file.write('>' + seq_id + '\n' + chunk_str + '\n')

Когда я печатаю список процессов, все они выглядят завершенными:

[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>, <Process(Process-6, stopped)>, <Process(Process-7, stopped)>]

Я использовал Pool, который работал нормально, но Я хотел бы использовать очередь сейчас. Любая помощь будет приветствоваться!

1 Ответ

0 голосов
/ 23 февраля 2020

Итак, по-видимому, это несколько «распространенная» проблема. Проверка размера очереди с помощью .empty() или .qsize() не гарантирует, что очередь действительно пуста. В быстрых процессах вы часто можете получить queue.empty()==True даже при том, что очередь не пуста.

Есть несколько предложенных решений для этого:
1- Включите таймер sleep() между процессами, чтобы очередь есть время, чтобы получить новые предметы.
2- Добавить стража в очередь.

Часовой гарантирует, что вы всегда закончите sh очередь, в отличие от опции таймера. После вставки элементов в очередь добавьте часового, например, queue.put(None) (None для низкого потребления памяти). Вставьте None для каждого процесса, который вы запускаете. Это приведет к чему-то вроде этого:

def sample_split_worker(self, queue):
    while True:
        #when the queue is finished, each process will receive a None, thus breaking the cycle.
        record = queue.get()
        If not record: break
        seqs, chunk, chunk_number = record
        self.save_chunks(seqs, chunk, chunk_number)

Надеюсь, это кому-нибудь поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...