Получение ошибочных исключений времени выполнения при попытке доступа к постоянным данным в многопроцессорной среде. Рабочие процессы пула - PullRequest
0 голосов
/ 12 декабря 2018

Вдохновленный этим решением Я пытаюсь настроить многопроцессорный пул рабочих процессов в Python.Идея состоит в том, чтобы передать некоторые данные рабочим процессам до того, как они действительно начнут свою работу, и в конце концов повторно использовать их.Он предназначен для минимизации объема данных, которые необходимо упаковать / распаковать для каждого вызова в рабочий процесс (т. Е. Снизить накладные расходы на межпроцессное взаимодействие).Мой MCVE выглядит следующим образом:

import multiprocessing as mp
import numpy as np

def create_worker_context():
    global context # create "global" context in worker process
    context = {}

def init_worker_context(worker_id, some_const_array, DIMS, DTYPE):
    context.update({
        'worker_id': worker_id,
        'some_const_array': some_const_array,
        'tmp': np.zeros((DIMS, DIMS), dtype = DTYPE),
        }) # store context information in global namespace of worker
    return True # return True, verifying that the worker process received its data

class data_analysis:
    def __init__(self):
        self.DTYPE = 'float32'
        self.CPU_LEN = mp.cpu_count()
        self.DIMS = 100
        self.some_const_array = np.zeros((self.DIMS, self.DIMS), dtype = self.DTYPE)
        # Init multiprocessing pool
        self.cpu_pool = mp.Pool(processes = self.CPU_LEN, initializer = create_worker_context) # create pool and context in workers
        pool_results = [
            self.cpu_pool.apply_async(
                init_worker_context,
                args = (core_id, self.some_const_array, self.DIMS, self.DTYPE)
            ) for core_id in range(self.CPU_LEN)
            ] # pass information to workers' context
        result_batches = [result.get() for result in pool_results] # check if they got the information
        if not all(result_batches): # raise an error if things did not work
            raise SyntaxError('Workers could not be initialized ...')

    @staticmethod
    def process_batch(batch_data):
        context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
        return context['tmp'] # return result

    def process_all(self):
        input_data = np.arange(0, self.DIMS ** 2, dtype = self.DTYPE).reshape(self.DIMS, self.DIMS)
        pool_results = [
            self.cpu_pool.apply_async(
                data_analysis.process_batch,
                args = (input_data,)
            ) for _ in range(self.CPU_LEN)
            ] # let workers actually work
        result_batches = [result.get() for result in pool_results]
        for batch in result_batches[1:]:
            np.add(result_batches[0], batch, out = result_batches[0]) # reduce batches
        print(result_batches[0]) # show result

if __name__ == '__main__':
    data_analysis().process_all()

Я запускаю вышеупомянутое с CPython 3.6.6.

Странная вещь ... иногда это работает, иногда нет.Если это не работает, метод process_batch выдает исключение, потому что он не может найти some_const_array в качестве ключа в context.Полный возврат выглядит следующим образом:

(env) me@box:/path> python so.py 
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/path/so.py", line 37, in process_batch
    context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
KeyError: 'some_const_array'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/path/so.py", line 54, in <module>
    data_analysis().process_all()
  File "/path/so.py", line 48, in process_all
    result_batches = [result.get() for result in pool_results]
  File "/path/so.py", line 48, in <listcomp>
    result_batches = [result.get() for result in pool_results]
  File "/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
KeyError: 'some_const_array'

Я озадачен.Что здесь происходит?

Если в моих словарях context содержится объект "более высокого типа", например, драйвер базы данных или аналогичный, я не получаю такой проблемы.Я могу воспроизвести это, только если мои context словари содержат базовые типы данных Python, коллекции или массивы.

(Существует ли потенциально лучший подход для достижения того же результата более надежным способом? Я знаю, что мой подход считается взломать ...)

1 Ответ

0 голосов
/ 12 декабря 2018

Вам необходимо переместить содержимое init_worker_context в вашу initializer функцию create_worker_context.

Ваше предположение, что каждый отдельный рабочий процесс будет запускаться init_worker_context отвечает затвое замешательство.Задачи, которые вы отправляете в пул, попадают в одну внутреннюю очередь задач, из которой считываются все рабочие процессы.В вашем случае происходит то, что некоторые рабочие процессы выполняют свою задачу и снова соревнуются за получение новых задач.Так что может случиться так, что один рабочий процесс выполнит несколько задач, а другой не получит ни одной.Помните, что ОС планирует время выполнения для потоков (рабочих процессов).

...