Python многопроцессорная обработка для подготовки набора данных - PullRequest
1 голос
/ 28 февраля 2020

Я ищу более короткие способы подготовки набора данных для задачи машинного обучения. Я обнаружил, что многопроцессорная библиотека может быть полезной. Однако, поскольку я новичок ie в многопроцессорной обработке, я не мог найти правильный путь.

Сначала я написал несколько кодов, как показано ниже:

class DatasetReader:
    def __init__(self):
        self.data_list = Read_Data_from_file
        self.data = []

    def _ready_data(self, ex, idx):
        # Some complex functions that takes several minutes

    def _dataset_creator(self, queue):
        for idx, ex in enumerate(self.data_list):
            queue.put(self._ready_data(ex, idx))

    def _dataset_consumer(self, queue):
        total_mem = 0.0
        t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
        for idx in t:
            ins = queue.get()
            self.data.append(ins)
            gc.collect()

    def _build_dataset(self):
        queue = Queue()
        creator = Process(target=self._dataset_creator, args=(queue,))
        consumer = Process(target=self._dataset_consumer, args=(queue,))
        creator.start()
        consumer.start()

        queue.close()
        queue.join_thread()

        creator.join()
        consumer.join()

Однако, на мой взгляд, поскольку _dataset_creator обрабатывает данные (здесь _ready_data) последовательным образом , это не поможет сократить потребление времени.

Итак, я изменил код для генерации нескольких процессов, которые обрабатывают один элемент данных:

class DatasetReader:
    def __init__(self):
        self.data_list = Read_Data_from_file
        self.data = []

    def _ready_data(self, ex, idx):
        # Some complex functions that takes several minutes

    def _dataset_creator(self, ex, idx, queue):
        queue.put(self._ready_data(ex, idx))

    def _dataset_consumer(self, queue):
        total_mem = 0.0
        t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
        for idx in t:
            ins = queue.get()
            self.data.append(ins)
            gc.collect()

    def _build_dataset(self):
        queue = Queue()
        for idx, ex in enumerate(self.data_list):
            p = Process(target=self._dataset_creator, args=(ex, idx, queue,))
            p.start()
        consumer = Process(target=self._dataset_consumer, args=(queue,))
        consumer.start()

        queue.close()
        queue.join_thread()

        consumer.join()

Однако это возвращает мне ошибки:

Process Process-18:  
Traceback ~~~  
RuntimeError: can't start new thread  
Traceback ~~~  
OSError: [Errno 12] Cannot allocate memory  

Не могли бы вы мне помочь? обрабатывать сложные данные параллельно?

РЕДАКТИРОВАТЬ 1:

Благодаря @tdelaney я могу сократить потребление времени, генерируя self.num_worker процессов (16 в моем эксперименте):

    def _dataset_creator(self, pid, queue):
        for idx, ex in list(enumerate(self.data_list))[pid::self.num_worker]:
            queue.put(self._ready_data(ex, idx))

    def _dataset_consumer(self, queue):
        t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ', bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) [{elapsed}<{remaining},{rate_fmt}{postfix}]')
        for _ in t:
            ins = queue.get()
            self.data[ins['idx']] = ins

    def _build_dataset(self):
        queue = Queue()
        procs = []
        for pid in range(self.num_worker):
            p = Process(target=self._dataset_creator, args=(pid, queue,))
            procs.append(p)
            p.start()
        consumer = Process(target=self._dataset_consumer, args=(queue,))
        consumer.start()

        queue.close()
        queue.join_thread()

        for p in procs:
            p.join()
        consumer.join()

1 Ответ

1 голос
/ 28 февраля 2020

Я пытаюсь набросать, как будет выглядеть решение с многопроцессорным пулом. Я полностью избавился от потребительского процесса, потому что похоже, что родительский процесс все равно ждет (и ему нужны данные), так что он может быть потребителем. Итак, я настроил пул и использую imap_unordered для обработки передачи данных работнику.

Я догадался, что для обработки данных вообще не нужен DatasetReader, и переместил его в свою собственную функцию. , В Windows либо весь объект DataReader сериализуется в подпроцесс (включая данные, которые вам не нужны), либо дочерняя версия объекта является неполной и может создать sh при попытке его использовать.

В любом случае, изменения, внесенные в объект DatasetReader в дочерних процессах, не видны в родительском. Это может быть неожиданным, если родительский объект зависит от обновленного состояния в этом объекте. На мой взгляд, лучше строго ограничить то, что происходит в подпроцессах.

from multiprocessing import Pool, get_start_method, cpu_count

# moved out of class (assuming it is not class dependent) so that
# the entire DatasetReader object isn't pickled and sent to
# the child on spawning systems like Microsoft Windows

def _ready_data(idx_ex):
    idx, ex = idx_ex
    # Some complex functions that take several minutes
    result = complex_functions(ex)
    return (idx, result)


class DatasetReader:

    def __init__(self):
        self.data_list = Read_Data_from_file
        self.data = [None] * len(data_list)

    def _ready_data_fork(self, idx):
        # on forking system, call worker with object data
        return _ready_data((idx, self.data_list[idx]))

    def run(self):

        t = tqdm(range(self.num_data), total=self.num_data, desc='Building Dataset ',
            bar_format='{desc}:{percentage:3.0f}% ({n_fmt}/{total_fmt}) '
                '[{elapsed}<{remaining},{rate_fmt}{postfix}]')

        pool = Pool(min(cpu_count, len(self.data_list)))
        if get_start_method() == 'fork':
            # on forking system, self.data_list is in child process and
            # we only pass the index
            result_iter = pool.imap_unordered(self._ready_data_fork, 
                    (idx for idx in range(len(data_list))),
                    chunksize=1)
        else:
            # on spawning system, we need to pass the data
            result_iter = pool.imap_unordered(_ready_data,
                    enumerate(self.data_list,
                    chunksize=1)

        for idx, result in result_iter:
            next(t)
            self.data[idx] = result

        pool.join()
...