Мне нужно обработать огромный файл размером около 30 ГБ, содержащий сотни миллионов строк. Точнее, я хочу выполнить три следующих шага:
Чтение файла по частям: учитывая размер файла, у меня нет памяти для чтения файла в одном go;
Вычисление содержимого чанков перед агрегацией каждого из них до более управляемого размера;
Объединение агрегированных чанков в окончательный набор данных содержащий результаты моих анализов.
До сих пор я кодировал два потока:
- Один поток отвечает за чтение файла чанками и сохранение чанки в очереди (шаг 1);
- Один поток отвечает за выполнение анализа (шаг 2) чанков;
Вот дух моего кода на данный момент с фиктивные данные:
import queue
import threading
import concurrent.futures
import os
import random
import pandas as pd
import time
def process_chunk(df):
return df.groupby(["Category"])["Value"].sum().reset_index(drop=False)
def producer(queue, event):
print("Producer: Reading the file by chunks")
reader = pd.read_table(full_path, sep=";", chunksize=10000, names=["Row","Category","Value"])
for index, chunk in enumerate(reader):
print(f"Producer: Adding chunk #{index} to the queue")
queue.put((index, chunk))
time.sleep(0.2)
print("Producer: Finished putting chunks")
event.set()
print("Producer: Event set")
def consumer(queue, event, result_list):
# The consumer stops iff queue is empty AND event is set
# <=> The consumer keeps going iff queue is not empty OR event is not set
while not queue.empty() or not event.is_set():
try:
index, chunk = queue.get(timeout=1)
except queue.Empty:
continue
print(f"Consumer: Retrieved chunk #{index}")
print(f"Consumer: Queue size {queue.qsize()}")
result_list.append(process_chunk(chunk))
time.sleep(0.1)
print("Consumer: Finished retrieving chunks")
if __name__=="__main__":
# Record the execution time
start = time.perf_counter()
# Generate a fake file in the current directory if necessary
path = os.path.dirname(os.path.realpath(__file__))
filename = "fake_file.txt"
full_path = os.path.join(path, filename)
if not os.path.exists(full_path):
print("Main: Generate a dummy dataset")
with open(full_path, "w", encoding="utf-8") as f:
for i in range(100000):
value = random.randint(1,101)
category = i%2
f.write(f"{i+1};{value};{category}\n")
# Defining a queue that will store the chunks of the file read by the Producer
queue = queue.Queue(maxsize=5)
# Defining an event that will be set by the Producer when he is done
event = threading.Event()
# Defining a list storing the chunks processed by the Consumer
result_list = list()
# Launch the threads Producer and Consumer
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, queue, event)
executor.submit(consumer, queue, event, result_list)
# Display that the program is finished
print("Main: Consumer & Producer have finished!")
print(f"Main: Number of processed chunks = {len(result_list)}")
print(f"Main: Execution time = {time.perf_counter()-start} seconds")
Я знаю, что каждая итерация шага 1 занимает больше времени, чем каждая итерация шага 2, т. е. что потребитель всегда будет ждать источника.
Как я могу ускорить процесс чтения моего файла кусками (шаг 1)?