asyn c файл, читаемый с помощью AIOfile - PullRequest
0 голосов
/ 21 июня 2020

Я пытаюсь прочитать несколько файлов (CSV) с помощью asyncio, но я не хочу блокировать главное событие l oop при этом.

Итак, я проверил AIOfile, который, кажется, обещает, что чтение не блокирует. Хотя это может быть правдой, для выполнения следующего фрагмента требуется огромное количество времени, это в основном тот же пример отсюда https://github.com/mosquito/aiofile#read -file-line-by-line

import asyncio
from aiofile import AIOFile, LineReader
from pathlib import Path
import time

counter = 0

async def main():
    path = 'test_data'
    global counter
    data_dir = Path(path)
    files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
    list_of_files = [(path + '/' + file.name, file) for file in files_in_basepath]
    for file in list_of_files:
        line_count = 0
        async with AIOFile(file[0]) as afp:
            await afp.fsync()
            async for line in LineReader(afp):
                #print(line)
                values = ''
                line_values = line.split(',')
                for item in line_values:
                    values = values + item + ' '
                # print(values)
                line_count += 1
        print(f'Processed {line_count} lines in file {file[1].name}.')
        counter += 1

start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
duration = time.time() - start_time
print(f"Processed {counter} data files in {duration} seconds")
*

Это дает ужасную производительность, на 100 файлов требуется:

Обработка 100 файлов данных за 196,8809883594513 секунд

По сравнению с последовательной обработкой этих файлов это просто невероятно ...

Обработано 100 файлов данных за 0,9933180809020996 секунду

Так что мне было интересно, что здесь происходит, а также я видел в нескольких местах рекомендацию запускать операции ввода-вывода в исполнителе, чтобы событие l oop не блокировалось.

Просто упомяну, у меня есть другой код, который запускает это в потоках и работает почти так же хорошо, как и последовательный:

import concurrent.futures
import csv
import threading
import time
from pathlib import Path

c_lock = threading.Lock()
counter = 0

def read_data_file(files):
    # Get the info from second item from tuple
    info = files[1].stat()
    global c_lock
    global counter
    c_lock.acquire()
    print(info.st_mtime)
    print(f'File name is {files[1].name} with size {round(info.st_size / float(1 << 10), 2)} KB')
    with open(files[0]) as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        line_count = 0
        for row in csv_reader:
            # Just assume we do something very interesting with these values...
            values = ''
            for item in row:
                values = values + item + ' '
            #print(values)
            line_count += 1
        print(f'Processed {line_count} lines in file {files[1].name}.')
    counter += 1
    c_lock.release()

def read_data_files(path):
    # List all files in data folder
    data_dir = Path(path)
    files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
    list_of_files = []
    for file in files_in_basepath:
        list_of_files.append((path + '/' + file.name, file))
    with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
        executor.map(read_data_file, list_of_files)


if __name__ == "__main__":
    data_files = 'test_data'
    start_time = time.time()
    read_data_files(data_files)
    duration = time.time() - start_time
    print(f"Processed {counter} data files in {duration} seconds")

Это дает следующее:

Обработано 100 файлов данных в 1.0079402923583984 секунды

Интересно, делаю ли я что-то не так с asyncio или мне следует вообще пропустить его ... Я просто пробую, что является наиболее эффективным ient способ обработки всех этих файлов, последовательный, многопоточный (включая asyncio) или многопроцессорный)

1 Ответ

1 голос
/ 22 июня 2020

Ваш многопоточный код блокирует все read_data_file гигантской блокировкой, заставляя его выполняться последовательно, в результате чего многопоточная версия работает не лучше, чем последовательная.

Версия asyncio также работает последовательно из-за того, что код не использует asyncio.gather или аналогичный для его распараллеливания. Что касается того, почему он в 200 раз медленнее, чем обычная последовательная версия, это может быть хорошим вопросом для разработчиков aiofiles. Я подозреваю, что каждая операция чтения строки отдельно передается внутреннему потоку, что замедляет его из-за большого объема бухгалтерии в таком горячем потоке. oop.

В итоге:

  • если ваша bottle -neck - это скорость ввода-вывода, вы можете что-то получить, используя несколько потоков, если вы позаботитесь о том, чтобы не делать вещи последовательными из-за ненужной блокировки. (GIL не будет проблемой, потому что он автоматически запускается во время операций ввода-вывода.)

  • если ваша bottle -neck - это скорость процессора, вы, вероятно, захотите исследовать несколько обработка, поскольку несколько потоков не помогут из-за GIL. Например, при чтении файлов CSV время, необходимое для анализа содержимого файла и его преобразования в числа, может затмить время, необходимое для его чтения с диска, особенно если файлы кэшированы.

  • asyncio и aiofiles, скорее всего, не помогут вам со скоростью обработки файлов CSV. aiofiles наиболее полезен при интеграции чтения файлов, которые могут «застрять» (например, потому что они могут читать с сетевого диска, которого больше нет). В текущей реализации это бесполезно для чтения файлов, где требуется высокая пропускная способность.

TL; DR попытаться получить ускорение с помощью потоков правильно, и если это не сработает , используя многопроцессорность.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...