Чтение файлов параллельно и параметризация параметров класса - PullRequest
0 голосов
/ 04 сентября 2018

Предположим, у меня есть класс, и я хочу прочитать несколько файлов с диска параллельно и параметризировать параметры класса. Какой самый правильный способ сделать это (и как)?

  • Главный поток должен дождаться завершения действия load_data (), прежде чем что-либо еще произойдет.

Я думал о потоке, так как это только действия ввода / вывода.

Пример непараллельной реализации (1-Threading):

import pandas as pd


class DataManager(object):
    def __init__(self):
        self.a = None
        self.b = None
        self.c = None
        self.d = None
        self.e = None
        self.f = None

    def load_data(self):
        self.a = pd.read_csv('a.csv')
        self.b = pd.read_csv('b.csv')
        self.c = pd.read_csv('c.csv')
        self.d = pd.read_csv('d.csv')
        self.e = pd.read_csv('e.csv')
        self.f = pd.read_csv('f.csv')

if __name__ == '__main__':
    dm = DataManager()
    dm.load_data()
    # Main thread is waiting for load_data to finish.
    print("finished loading data")

Ответы [ 2 ]

0 голосов
/ 04 сентября 2018

Возможное решение с Python3 ThreadPoolExecutor

    from concurrent.futures import ThreadPoolExecutor
    import queue
    import pandas as pd

    def load_data_worker(data_queue, file_name):
        data_queue.put(pd.read_csv(file_name))

    class DataManager(object):
        def __init__(self):
            self.data_queue = queue.Queue()
            self.data_arr = []

        def load_data(self):
            with ThreadPoolExecutor() as executor:
                executor.submit(load_data_woker, self.data_queue, 'a.csv')
                executor.submit(load_data_woker, self.data_queue, 'b.csv')
                # ... 
                executor.submit(load_data_woker, self.data_queue, 'f.csv')
           # dumping Queue of loaded data to array 
           self.data_arr = list(self.data_queue.queue)



    if __name__ == '__main__':
        dm = DataManager()
        dm.load_data()
        # Main thread is waiting for load_data to finish.
        print("finished loading data")
0 голосов
/ 04 сентября 2018

Операции ввода-вывода в большинстве случаев не ограничены ЦП, поэтому использование нескольких процессов является излишним. Использование нескольких потоков может быть полезным, но pb.read_csv не только читает файл, но и анализирует его, что может быть ограничено ЦП. Я предлагаю вам читать файлы с диска с помощью asyncio, как только это было изначально сделано для этой цели. Вот код для этого:

import asyncio
import aiofiles


async def read_file(file_name):
    async with aiofiles.open(file_name, mode='rb') as f:
        return await f.read()


def read_files_async(file_names: list) -> list:
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(
        asyncio.gather(*[read_file(file_name) for file_name in file_names]))


if __name__ == '__main__':
    contents = read_files_async([f'files/file_{i}.csv' for i in range(10)])
    print(contents)

Функция read_files_async возвращает список содержимого файла (байтовые буферы), который вы можете передать pd.read_csv.

Я думаю, что оптимизации чтения файлов должно быть достаточно, но вы можете анализировать содержимое файлов параллельно с несколькими процессами (потоки и асинхронность не увеличат производительность процесса анализа):

import multiprocessing as mp

NUMBER_OF_CORES = 4
pool = mp.Pool(NUMBER_OF_CORES)
pool.map(pb.read_csv, contents)

Вы должны установить NUMBER_OF_CORES в соответствии со спецификацией вашей машины.

...