Трио: чтение нескольких задач с одного и того же диска - PullRequest
0 голосов
/ 18 сентября 2018

У меня есть файловый дескриптор, и я хотел бы прочитать его с несколькими задачами.Каждый запрос read () на fd будет возвращать полный, независимый пакет данных (пока данные доступны).

Моя наивная реализация заключалась в том, чтобы каждый работник выполнял следующий цикл:

async def work_loop(fd):
   while True:
     await trio.hazmat.wait_readable(fd)
     buf = os.read(fd, BUFSIZE)
     if not buf:
         break
     await do_work(buf)

К сожалению, это не работает, потому что трио повышает ResourceBusyError, если несколько задач блокируются на одном и том же fd.Поэтому моей следующей итерацией было написать пользовательскую функцию ожидания:

async def work_loop(fd):
   while True:
     await my_wait_readable(fd)
     buf = os.read(fd, BUFSIZE)
     if not buf:
         break
     await do_work(buf)

, где

read_queue = trio.hazmat.ParkingLot()
async def my_wait_readable():
    if name is None:
        name = trio.hazmat.current_task().name
    while True:
        try:
            log.debug('%s: Waiting for fd to become readable...', name)
            await trio.hazmat.wait_readable(fd)
        except trio.ResourceBusyError:
            log.debug('%s: Resource busy, parking in read queue.', name)
            await read_queue.park()
            continue
        log.debug('%s: fd readable, unparking next task.', name)
        read_queue.unpark()
        break

Однако в тестах я получаю такие сообщения og:

2018-09-18 13:09:17.219 pyfuse3-worker-37: Waiting for fd to become readable...
2018-09-18 13:09:17.219 pyfuse3-worker-47: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-53: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-51: fd readable, unparking next task.
2018-09-18 13:09:17.220 pyfuse3-worker-51: doing work
2018-09-18 13:09:17.221 pyfuse3-worker-47: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-37: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-53: Resource busy, parking in read queue.

Другими словами:

  1. Все задачи введите trio.hazmat.wait_readable
  2. Одна задача успешно возвращается и пытается отменить следующую задачу (но ее нет)
  3. другие задачи получают BusyError и паркуются сами
  4. Ничего не происходит, так как все рабочие припаркованы

Как правильно решить эту проблему?

1 Ответ

0 голосов
/ 19 сентября 2018

Несколько читателей с одного и того же fd не имеют смысла, использование Trio (или нет) не меняет этот базовый факт.Почему вы пытаетесь сделать это в первую очередь?

Если по какой-то причине вам действительно требуется параллельное выполнение нескольких задач для последующей обработки данных, используйте одну задачу чтения, чтобы добавить данные в очередь и позволить вашемузадачи обработки получают свои данные от этого.

Альтернативно, вы можете использовать блокировку:

read_lock = trio.Lock()
async def work_loop(fd):
   while True:
     async with read_lock:
        await trio.hazmat.wait_readable(fd)
        buf = os.read(fd, BUFSIZE)
     if not buf:
         break
     await do_work(buf)
...