Поскольку ни numpy, ни pandas io не знают о asyncio, это может быть лучшим вариантом использования для потоков, чем для asyncio. (Кроме того, решения на основе Asyncio будут использовать потоки за кулисами в любом случае.)
Например, этот код порождает поток записи, в который вы отправляете работу, используя очередь:
import threading, queue
to_write = queue.Queue()
def writer():
# Call to_write.get() until it returns None
for df in iter(to_write.get, None):
with open('result.csv', 'a') as f:
df.to_csv(f, header=False, index=False)
threading.Thread(target=writer).start()
for name, group in data.groupby('Date'):
df = lot_of_numpy_calculations(group)
to_write.put(df)
# enqueue None to instruct the writer thread to exit
to_write.put(None)
Обратите внимание, что, если запись оказывается последовательно медленнее, чем вычисление, очередь будет продолжать накапливать кадры данных, которые могут в итоге потреблять много памяти. В этом случае обязательно укажите максимальный размер очереди, передав аргумент maxsize
конструктору .
Кроме того, учтите, что повторное открытие файла для каждой записи может замедлить запись. Если объем записанных данных невелик, возможно, вы могли бы повысить производительность, открыв файл заранее.