Я пытаюсь прочитать большой файл и экспортировать пакеты файлов CSV с помощью Asyncio. Я знаю, что Asyncio не поддерживает asyn c IO для одного и того же файла, поэтому я пытаюсь выполнить экспорт в отдельные файлы для каждой задачи с указанием номера пакета. Но он работает только синхронно ..
У меня есть main.py, и у него есть функция def start()
def start():
asyncio.get_event_loop().run_until_complete(processing.test_async(dictRunData))
У меня есть processing.py и есть функция test_async()
async def test_async(dictRunData):
num_logical_cpus = multiprocessing.cpu_count()
with open(dictRunData['input_file'], 'r') as infile:
content = infile.read().replace('\n', '')
lstcontent = ast.literal_eval(content)
tasks = []
chunkNum = 0
chunk_contents = numpy.array_split(numpy.array(lstcontent), num_logical_cpus)
print(f"number of chunks: {len(chunk_contents)}")
for chunk in chunk_contents:
chunkNum += 1
task = asyncio.create_task(process_chunk_async(chunk, chunkNum))
tasks.append(task)
result = await asyncio.gather(*tasks, return_exceptions=True)
Вот функция обработки данного чанка.
async def process_chunk_async(chunk, chunkNum, dictRunData):
dict_results = {}
for data in chunk:
..do something..
dict_results.append(data)
outputfile = await write_chunk_async(dict_results, chunkNum, dictRunData)
Вот write_chunk_async
async def write_chunk_async(dict_results, chunkNum, dictRunData):
fileName = f"_{chunkNum}.csv"
wrtieFileTo = open(fileName,"a+")
for data in dict_results.keys():
wrtieFileTo.write(data + "\n")
wrtieFileTo.close()
print(f"Done write_chunk_async file: {fileName}")