Многопроцессорная обработка с генераторами в нескольких файлах и способы обхода TypeError («не могу выбрать объекты генератора») - PullRequest
1 голос
/ 11 марта 2019

Я пытаюсь обработать несколько файлов одновременно, причем каждый файл будет генерировать порции данных для одновременной подачи в очередь с определенным ограничением размера. Например, если есть 5 файлов, содержащих по 1 миллиону элементов каждый, я бы хотел передать 100 элементов из каждого из них в другой генератор, который выдает 500 элементов одновременно.

Вот то, что я пытался до сих пор, но сталкиваюсь с ошибкой can't pickle generator:

import os
from itertools import islice
import multiprocessing as mp
import numpy as np

class File(object):
    def __init__(self, data_params):
        data_len = 100000
        self.large_data = np.array([data_params + str(i) for i in np.arange(0, data_len)])
    def __iter__(self):
        for i in self.large_data:
            yield i

def parse_file(file_path):
    # differnt filepaths yeild different data obviously
    # here we just emulate with something silly
    if file_path == 'elephant_file':
        p = File(data_params = 'elephant')
    if file_path == 'number_file':
        p = File(data_params = 'number')
    if file_path == 'horse_file':
        p = File(data_params = 'horse')


    yield from p

def parse_dir(user_given_dir, chunksize = 10):
    pool = mp.Pool(4)
    paths = ['elephant_file', 'number_file', 'horse_file'] #[os.path.join(user_given_dir, p) for p in os.listdir(user_given_dir)]

    # Works, but not simultaneously on all paths
#     for path in paths:
#         data_gen = parse_file(path)
#         parsed_data_batch = True
#         while parsed_data_batch:
#             parsed_data_batch = list(islice(data_gen, chunksize))
#             yield parsed_data_batch

    # Doesn't work
    for objs in pool.imap(parse_file, paths, chunksize = chunksize):
        for o in objs:
            yield o

it = parse_dir('.')
for ix, o in enumerate(it):
    print(o) # hopefully just prints 10 elephants, horses and numbers
    if ix>2: break

Кто-нибудь имеет представление о том, как получить желаемое поведение?

1 Ответ

0 голосов
/ 11 марта 2019
  1. Для ошибки рассола:

    parse_file - это генератор, а не обычная функция, поскольку он использует yield внутри.

    И multiprocessing требуетфункция как задача для выполнения.Поэтому вы должны заменить yield from p на return p в parse_file()

  2. Если вы хотите получать записи в чанках из всех файлов по одному, попробуйте использовать zip в parse_dir():

    iterators = [
        iter(e) for e in pool.imap(parse_file, paths, chunksize=chunksize)
    ]
    
    while True:
        batch = [
            o for i in iterators
            for _, o in zip(range(100), i)  # e.g., 100
        ]
       if batch:
            yield batch
        else:
            return
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...