итератор Python из списка итераторов в очередь блоков данных из внутренних итераторов - PullRequest
0 голосов
/ 05 июня 2019

У меня есть каталог с CSV-файлами -> 'data' Я загружаю каждый CSV в dataframe как итератор с параметром chunksize -> inner_it В результате получается список итераторов -> ll Я хочу загрузить все чанки из каждого inner_it в очередь. Как это сделать чистым способом?

В настоящее время я делаю:

import os

import pandas as pd


def sample_gen(df):
    yield next(df)


def get_next(df, qq):
    try:
        while True:
            z = next(df)
            print(z.shape)
    except StopIteration:
        pass
    finally:
        qq.append(z)
        return qq


ll = iter([pd.read_csv(os.path.join(f'data/{x}'), chunksize=10**6) for x in os.listdir('data')])
qq = []


def load_queue(ll, qq):
    try:
        inner_it = next(ll)
        qq = get_next(inner_it, qq)
    except StopIteration:
        load_queue(ll, qq)
    finally:
        return qq, ll

Я не знаю, как работать load_queue

EDIT: Я решил сгладить свой список итераторов и использовать вместо них генераторы. Вот мое окончательное решение ниже:

import os
import threading
import concurrent.futures
import queue
import time
import pandas as pd


def producer(queue, event):
    ll = (pd.read_csv(os.path.join(f'data/{x}'), chunksize=10 ** 6) for x in os.listdir('data'))
    ll = (chunk for each_iterator in ll for chunk in each_iterator)

    while True:
        try:
            message = next(ll)
            queue.put(message, "P")
        except Exception as ex:
            print(ex)
            event.set()
            break
    print('producer got exit event')


def consumer(queue, event):
    while not event.is_set():
        message = queue.get()
        print(message.shape, 'C')
    print('consumer got exit event')


if __name__ == '__main__':
    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)
...