Как Asyn c мариновать много файлов с aiofiles? - PullRequest
2 голосов
/ 15 января 2020

У меня есть список, который я хотел бы написать, data, по одному файлу для каждого элемента, например:

for i,chunk in enumerate(data):
    fname = ROOT / f'{i}.in'
    with open(fname, "wb") as fout:
        dill.dump(chunk, fout)

Поскольку список данных может быть довольно длинным, и я пишу в сетевое хранилище, я трачу много времени на ожидание итерации в NFS туда-сюда, и я хотел бы сделать это асинхронно, если это возможно.

У меня есть кое-что, что в основном похоже на это сейчас :

import dill
import asyncio
import aiofiles
from pathlib import Path

ROOT = Path("/tmp/")

data = [str(i) for i in range(500)]

def serialize(data):
  """
  Write my data out in serial
  """
  for i,chunk in enumerate(data):
    fname = ROOT / f'{i}.in'
    print(fname)
    with open(fname, "wb") as fout:
        dill.dump(chunk, fout)

def aserialize(data):
  """
  Same as above, but writes my data out asynchronously
  """
  fnames = [ROOT / f'{i}.in' for i in range(len(data))]
  chunks = data
  async def write_file(i):
    fname = fnames[i]
    chunk = chunks[i]
    print(fname)
    async with aiofiles.open(fname, "wb") as fout:
        print(f"written: {i}")
        dill.dump(chunk, fout)
        await fout.flush()
  loop = asyncio.get_event_loop()
  loop.run_until_complete(asyncio.gather(*[write_file(i) for i in range(len(data))]))

Теперь, когда я проверяю записи, это выглядит достаточно быстро, чтобы быть полезным для моей NFS:

# test 1
start = datetime.utcnow()
serialize(data)
end = datetime.utcnow()
print(end - start)
# >>> 0:02:04.204681

# test 3
start = datetime.utcnow()
aserialize(data)
end = datetime.utcnow()
print(end - start)
# >>> 0:00:27.048893
# faster is better.

Но когда я фактически / де / -сериализую данные, я написал, я вижу, что, возможно, это было быстро, потому что он ничего не писал:

def deserialize(dat):
  tmp = []
  for i in range(len(dat)):
    fname = ROOT / f'{i}.in'
    with open(fname, "rb") as fin:
      fo = dill.load(fin)
    tmp.append(fo)
  return tmp

serialize(data)
d2 = deserialize(data)
d2 == data
# True

Хорошо, тогда как:

aserialize(data)
d3 = deserialize(data)
>>> Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 6, in deserialize
  File "...python3.7/site-packages/dill/_dill.py", line 305, in load
    obj = pik.load()
EOFError: Ran out of input

То есть асинхронно записанные файлы пусты. Неудивительно, что это было так быстро.

Как я могу асинхронно укрупнить / протолкнуть свой список в файлы и заставить их фактически писать? Я предполагаю, что мне нужно как-то дождаться dill.dump? Я думал, что fout.flu sh справится с этим, но, похоже, нет.

1 Ответ

2 голосов
/ 03 февраля 2020

Я изменил строку dill.dump(chunk, fout) на await fout.write(dill.dumps(chunk)), получил данные, записанные в файлы и правильно десериализованный. Похоже, dill.dump работает только с обычными синхронными файлами, вызывающими метод file.write без ключевого слова await.

...