Asyncio не запускает все задачи одновременно в Python. пытается экспортировать CSV-файлы в пакетном режиме - PullRequest
0 голосов
/ 03 марта 2020

Я пытаюсь прочитать большой файл и экспортировать пакеты файлов CSV с помощью Asyncio. Я знаю, что Asyncio не поддерживает asyn c IO для одного и того же файла, поэтому я пытаюсь выполнить экспорт в отдельные файлы для каждой задачи с указанием номера пакета. Но он работает только синхронно ..

У меня есть main.py, и у него есть функция def start()

def start():
    asyncio.get_event_loop().run_until_complete(processing.test_async(dictRunData))

У меня есть processing.py и есть функция test_async()

async def test_async(dictRunData):
  num_logical_cpus = multiprocessing.cpu_count()
  with open(dictRunData['input_file'], 'r') as infile:
    content = infile.read().replace('\n', '')
    lstcontent = ast.literal_eval(content)

  tasks = []
  chunkNum = 0
  chunk_contents = numpy.array_split(numpy.array(lstcontent), num_logical_cpus)
  print(f"number of chunks: {len(chunk_contents)}")
  for chunk in chunk_contents:
    chunkNum += 1
    task = asyncio.create_task(process_chunk_async(chunk, chunkNum))
    tasks.append(task)

  result = await asyncio.gather(*tasks, return_exceptions=True)

Вот функция обработки данного чанка.

async def process_chunk_async(chunk, chunkNum, dictRunData):
    dict_results = {}
    for data in chunk:
       ..do something..
       dict_results.append(data)

    outputfile = await write_chunk_async(dict_results, chunkNum, dictRunData)

Вот write_chunk_async

async def write_chunk_async(dict_results, chunkNum, dictRunData):
    fileName = f"_{chunkNum}.csv"
    wrtieFileTo = open(fileName,"a+")

    for data in dict_results.keys():
        wrtieFileTo.write(data + "\n")

    wrtieFileTo.close()

    print(f"Done write_chunk_async file: {fileName}")

1 Ответ

2 голосов
/ 03 марта 2020

asyncio обеспечивает параллелизм, только если вы используете его API для выполнения асинхронного ввода-вывода. В вашем примере кода весь ваш ввод / вывод (чтение / запись файлов) выполняется с использованием синхронных, блокирующих API, поэтому использование asyncio не добавляет никакого значения. Теперь asyncio фактически не предоставляет никаких API для асинхронного чтения / записи файлов, потому что это не очень хорошо поддерживается на уровне операционной системы. См. это объяснение из Python вики.

Существует сторонняя библиотека, aiofiles, которая предоставляет дружественный asyncio API для файлового ввода-вывода, но она просто делегирует всю работу фоновым потокам под Обложки, так что на самом деле нет причин использовать его, вы не пытаетесь интегрировать файловый ввод-вывод в приложение, уже использующее asyncio. Если все ваше приложение делает чтение / запись файлов, просто используйте потоки напрямую. Имейте в виду, однако, что если все ваши потоки читают / записывают файлы на один и тот же диск, многопоточность тоже может не сильно помочь, поскольку в конечном итоге все потоки будут бороться друг с другом, пытаясь получить доступ к одному диску.

...