У меня есть каталог с CSV-файлами -> 'data'
Я загружаю каждый CSV в dataframe как итератор с параметром chunksize -> inner_it
В результате получается список итераторов -> ll
Я хочу загрузить все чанки из каждого inner_it в очередь.
Как это сделать чистым способом?
В настоящее время я делаю:
import os
import pandas as pd
def sample_gen(df):
yield next(df)
def get_next(df, qq):
try:
while True:
z = next(df)
print(z.shape)
except StopIteration:
pass
finally:
qq.append(z)
return qq
ll = iter([pd.read_csv(os.path.join(f'data/{x}'), chunksize=10**6) for x in os.listdir('data')])
qq = []
def load_queue(ll, qq):
try:
inner_it = next(ll)
qq = get_next(inner_it, qq)
except StopIteration:
load_queue(ll, qq)
finally:
return qq, ll
Я не знаю, как работать load_queue
EDIT:
Я решил сгладить свой список итераторов и использовать вместо них генераторы. Вот мое окончательное решение ниже:
import os
import threading
import concurrent.futures
import queue
import time
import pandas as pd
def producer(queue, event):
ll = (pd.read_csv(os.path.join(f'data/{x}'), chunksize=10 ** 6) for x in os.listdir('data'))
ll = (chunk for each_iterator in ll for chunk in each_iterator)
while True:
try:
message = next(ll)
queue.put(message, "P")
except Exception as ex:
print(ex)
event.set()
break
print('producer got exit event')
def consumer(queue, event):
while not event.is_set():
message = queue.get()
print(message.shape, 'C')
print('consumer got exit event')
if __name__ == '__main__':
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)