Я пытаюсь прочитать несколько файлов (CSV) с помощью asyncio, но я не хочу блокировать главное событие l oop при этом.
Итак, я проверил AIOfile, который, кажется, обещает, что чтение не блокирует. Хотя это может быть правдой, для выполнения следующего фрагмента требуется огромное количество времени, это в основном тот же пример отсюда https://github.com/mosquito/aiofile#read -file-line-by-line
import asyncio
from aiofile import AIOFile, LineReader
from pathlib import Path
import time
counter = 0
async def main():
path = 'test_data'
global counter
data_dir = Path(path)
files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
list_of_files = [(path + '/' + file.name, file) for file in files_in_basepath]
for file in list_of_files:
line_count = 0
async with AIOFile(file[0]) as afp:
await afp.fsync()
async for line in LineReader(afp):
#print(line)
values = ''
line_values = line.split(',')
for item in line_values:
values = values + item + ' '
# print(values)
line_count += 1
print(f'Processed {line_count} lines in file {file[1].name}.')
counter += 1
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
duration = time.time() - start_time
print(f"Processed {counter} data files in {duration} seconds")
*
Это дает ужасную производительность, на 100 файлов требуется:
Обработка 100 файлов данных за 196,8809883594513 секунд
По сравнению с последовательной обработкой этих файлов это просто невероятно ...
Обработано 100 файлов данных за 0,9933180809020996 секунду
Так что мне было интересно, что здесь происходит, а также я видел в нескольких местах рекомендацию запускать операции ввода-вывода в исполнителе, чтобы событие l oop не блокировалось.
Просто упомяну, у меня есть другой код, который запускает это в потоках и работает почти так же хорошо, как и последовательный:
import concurrent.futures
import csv
import threading
import time
from pathlib import Path
c_lock = threading.Lock()
counter = 0
def read_data_file(files):
# Get the info from second item from tuple
info = files[1].stat()
global c_lock
global counter
c_lock.acquire()
print(info.st_mtime)
print(f'File name is {files[1].name} with size {round(info.st_size / float(1 << 10), 2)} KB')
with open(files[0]) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
line_count = 0
for row in csv_reader:
# Just assume we do something very interesting with these values...
values = ''
for item in row:
values = values + item + ' '
#print(values)
line_count += 1
print(f'Processed {line_count} lines in file {files[1].name}.')
counter += 1
c_lock.release()
def read_data_files(path):
# List all files in data folder
data_dir = Path(path)
files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
list_of_files = []
for file in files_in_basepath:
list_of_files.append((path + '/' + file.name, file))
with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
executor.map(read_data_file, list_of_files)
if __name__ == "__main__":
data_files = 'test_data'
start_time = time.time()
read_data_files(data_files)
duration = time.time() - start_time
print(f"Processed {counter} data files in {duration} seconds")
Это дает следующее:
Обработано 100 файлов данных в 1.0079402923583984 секунды
Интересно, делаю ли я что-то не так с asyncio или мне следует вообще пропустить его ... Я просто пробую, что является наиболее эффективным ient способ обработки всех этих файлов, последовательный, многопоточный (включая asyncio) или многопроцессорный)