Обработка огромного файла (> 30 ГБ) в Python - PullRequest
0 голосов
/ 20 февраля 2020

Мне нужно обработать огромный файл размером около 30 ГБ, содержащий сотни миллионов строк. Точнее, я хочу выполнить три следующих шага:

  1. Чтение файла по частям: учитывая размер файла, у меня нет памяти для чтения файла в одном go;

  2. Вычисление содержимого чанков перед агрегацией каждого из них до более управляемого размера;

  3. Объединение агрегированных чанков в окончательный набор данных содержащий результаты моих анализов.

До сих пор я кодировал два потока:

  • Один поток отвечает за чтение файла чанками и сохранение чанки в очереди (шаг 1);
  • Один поток отвечает за выполнение анализа (шаг 2) чанков;

Вот дух моего кода на данный момент с фиктивные данные:

import queue
import threading
import concurrent.futures
import os
import random
import pandas as pd
import time

def process_chunk(df):
    return df.groupby(["Category"])["Value"].sum().reset_index(drop=False)

def producer(queue, event):
    print("Producer: Reading the file by chunks")
    reader = pd.read_table(full_path, sep=";", chunksize=10000, names=["Row","Category","Value"])
    for index, chunk in enumerate(reader):
        print(f"Producer: Adding chunk #{index} to the queue")
        queue.put((index, chunk))
        time.sleep(0.2)
    print("Producer: Finished putting chunks")
    event.set()
    print("Producer: Event set")

def consumer(queue, event, result_list):
    # The consumer stops iff queue is empty AND event is set
    # <=> The consumer keeps going iff queue is not empty OR event is not set
    while not queue.empty() or not event.is_set():
        try:
            index, chunk = queue.get(timeout=1)
        except queue.Empty:
            continue
        print(f"Consumer: Retrieved chunk #{index}")
        print(f"Consumer: Queue size {queue.qsize()}")
        result_list.append(process_chunk(chunk))
        time.sleep(0.1)
    print("Consumer: Finished retrieving chunks")

if __name__=="__main__":
    # Record the execution time
    start = time.perf_counter()

    # Generate a fake file in the current directory if necessary
    path = os.path.dirname(os.path.realpath(__file__))
    filename = "fake_file.txt"
    full_path = os.path.join(path, filename)
    if not os.path.exists(full_path):
        print("Main: Generate a dummy dataset")
        with open(full_path, "w", encoding="utf-8") as f:
            for i in range(100000):
                value = random.randint(1,101)
                category = i%2
                f.write(f"{i+1};{value};{category}\n")

    # Defining a queue that will store the chunks of the file read by the Producer
    queue = queue.Queue(maxsize=5)

    # Defining an event that will be set by the Producer when he is done
    event = threading.Event()

    # Defining a list storing the chunks processed by the Consumer
    result_list = list()

    # Launch the threads Producer and Consumer
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, queue, event)
        executor.submit(consumer, queue, event, result_list)

    # Display that the program is finished
    print("Main: Consumer & Producer have finished!")
    print(f"Main: Number of processed chunks = {len(result_list)}")
    print(f"Main: Execution time = {time.perf_counter()-start} seconds")

Я знаю, что каждая итерация шага 1 занимает больше времени, чем каждая итерация шага 2, т. е. что потребитель всегда будет ждать источника.

Как я могу ускорить процесс чтения моего файла кусками (шаг 1)?

...