asyncio: выполнение задачи, только если все другие задачи ожидают - PullRequest
10 голосов
/ 26 мая 2019

В настоящее время я выполняю бесконечные задачи, используя asyncio.wait

Мне нужна специальная функция для запуска, когда все остальные включены await

import asyncio 

async def special_function():
    while True:
        # does some work, 
        # Passes control back to controller to run main_tasks
        # if they are no longer waiting.
        await asyncio.sleep(0)

async def handler():
    tasks = [task() for task in main_tasks]

    # Adding the task that I want to run when all main_tasks are awaiting:
    tasks.append(special_function())

    await asyncio.wait(tasks)

asyncio.get_event_loop().run_until_complete(handler())

Как заставить special_function запускаться только тогда, когда все main_tasks включены await?


Edit:

Что я имею в виду под "все main_tasks включены await": все main_tasks не готовы к продолжению, например, находятся в asyncio.sleep(100) или связаны с вводом-выводом и все еще ожидают данных.

Следовательно, main_tasks не может продолжаться, и цикл обработки запускает special_function, пока задачи находятся в этом состоянии, а НЕ на каждой итерации цикла события.


Редактировать 2:

Мой вариант использования:

main_tasks обновляет структуру данных новыми данными из веб-сокетов.

special_function передает эти данные другому процессу по сигналу обновления от этого процесса. (multiprocessing с общими переменными и структурами данных)

Это должны быть самые свежие данные, которые могут быть при передаче, не должно быть ожидающих обновлений от main_tasks.

Именно поэтому я хочу запускать special_function только тогда, когда нет main_tasks с новыми данными, доступными для обработки. (т. е. все ожидают await)

Ответы [ 5 ]

5 голосов
/ 30 мая 2019

Я попытался написать тест для условия «задача не готова к выполнению». Я думаю, что Asyncio не раскрывает детали из планировщика. Разработчики четко заявили, что хотят сохранить свободу для изменения внутренних компонентов asyncio без нарушения обратной совместимости.

В asyncio.Task есть этот комментарий (примечание: _step() запускает сопрограмму задачи до следующего ожидания):

# An important invariant maintained while a Task not done:
#   
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.

Но эта внутренняя переменная, конечно же, отсутствует в API.

Вы можете получить некоторый ограниченный доступ к _fut_waiter, прочитав вывод repr(task), но формат, кажется, также не является надежным, поэтому я не буду зависеть от чего-то подобного:

PENDINGMSG = 'wait_for=<Future pending '

if all(PENDINGMSG in repr(t) for t in monitored_tasks):
    do_something()

В любом случае, я думаю, вы пытаетесь быть слишком совершенным. Вы хотите знать, есть ли новые данные в других задачах. Что если данные находятся в асинхронных буферах? Буфер ядра? Сетевая карта получает буфер? ... Вы никогда не сможете узнать, поступят ли новые данные в следующую миллисекунду.

Мое предложение: записывать все обновления в одну очередь. Проверьте эту очередь как единственный источник обновлений. Если очередь пуста, опубликуйте последнее состояние.

2 голосов
/ 04 июня 2019

Вот что я бы сделал:

  1. Я бы не использовал вашу специальную функцию.

  2. Каждое обновление данных требует отдельного идентификатора генерации (4-байтовое целое число), и я бы помещал этот идентификатор только в общую память.

Я полагаю, что оба процесса работают независимо.

  1. Абонент сохраняет идентификатор генерации как локальный. Когда он замечает, что идентификатор поколения изменяется в разделяемой памяти, тогда считываются новые данные из файла.

  2. Данные хранятся в tmpfs (/ tmp), поэтому они находятся в памяти. Вы можете создавать свои собственные tmpfs, если вам это подходит. Это быстро.

Вот почему:

  • Чтобы убедиться, что подписчик не извлекает полусухие данные в общей памяти, его необходимо защитить семафором. Это ПИТА
  • Используя файл, вы можете переносить данные переменного размера. Это может не относиться к вам. При использовании совместно используемой памяти трудно решить одну проблему, а не тратить ее. Использование файла решает эту проблему.
  • При использовании 4-байтового идентификатора поколения int идентификатор обновления является атомарным. Это огромное преимущество.

Итак, когда одна из ваших задач получает новые данные, открывает файл, записывает в него, и после закрытия дескриптора файла вы записываете идентификатор поколения в общую память. Перед обновлением идентификатора генерации вы можете безопасно удалить файл. Подписчик - если он открыл файл, он завершит чтение файла, и если он попытается открыть его, он не откроется, поэтому ему все равно придется ждать следующего поколения. Если машина выходит из строя, / tmp пропадает, поэтому вам не нужно беспокоиться об очистке файлов. Вы даже можете написать новую задачу, единственной задачей которой является удаление файлов в / tmp старшего поколения, если хотите.

1 голос
/ 26 мая 2019

Когда цикл обработки событий выполняет какую-либо задачу, эта задача выполняется до тех пор, пока она не вернет управление обратно в цикл обработки событий.Обычно есть только одна причина, по которой задача хочет вернуть управление циклу событий: задача, если она сталкивается с блокирующей операцией (и поэтому «не готова к продолжению»).

Это означает, что «каждая итерация цикла событий»обычно равны "все main_tasks на await".Код, который у вас уже есть, будет (в основном) работать так, как вы хотите.Единственное, что вам нужно сделать, это , чтобы выполнить special_function() задачу.


Есть некоторая случайная задача, возвращающая управление в цикл обработки событий перед тем, как столкнуться с "реальным" блокирующим вызовом, и обычно выглядитawait asyncio.sleep(0) (как вы делаете в special_function).Это означает, что задача хочет обеспечить выполнение всех остальных задач перед продолжением: вы, вероятно, хотите это соблюдать.

0 голосов
/ 31 мая 2019

Перенесите запросы ввода и вывода в PriorityQueue с приоритетным входом над выходом.Затем просто обработайте задачи из очереди в обычном режиме, и он всегда будет выполнять все ожидающие входные запросы перед любыми выходными.

Таким образом, ваш основной цикл будет состоять из чего-то вроде следующего:

  • InputListener1(ставит в очередь каждый InputTask1, полученный с приоритетом 0)
  • InputListener2 (ставит в очередь каждый InputTask2, полученный с приоритетом 0)
  • InputListener3 (ставит в очередь каждый InputTask3, полученный с приоритетом 0)
  • OutputListener (queuesкаждый OutputTask, полученный с приоритетом 1)
  • QueueWorker (обрабатывает следующую задачу из очереди)

Это, вероятно, означает, что вам придется разделить логику для всех существующих задач наотдельные слушатели сокетов и фактическая обработка задач, но это не обязательно плохо.

0 голосов
/ 29 мая 2019

Почему бы не использовать семафор

async def do_stuff(semaphore):
    async with semaphore:
      await getting_stuff_done()

async def wait_til_everyone_is_busy(semaphore):
    while not semaphore.locked():
      await asyncio.sleep(1)
    do_other_stuff()

Чтобы лучше проиллюстрировать мою точку зрения, возьмите этот тривиальный пример:

import asyncio
import time

async def process(semaphore, i):
    while True:
        print(f"{i} I'm gonna await")
        await asyncio.sleep(1)
        async with semaphore:
            print(f'{i} sleeping')
            await asyncio.sleep(3)
        print(f'{i} done sleeping')
        print(f"{i} I'm gonna await again")
        await asyncio.sleep(1)

async def other_process(semaphore):
    while True:
        while not semaphore.locked():
            print("Everyone is awaiting... but I'm not startingr")
            await asyncio.sleep(1)
        print("Everyone is busy, let's do this!")
        time.sleep(5)
        print('5 seconds are up, let everyone else play again')
        await asyncio.sleep(1)

semaphore = asyncio.Semaphore(10)
dataset = [i for i in range(10)]
loop = asyncio.new_event_loop()
tasks = [loop.create_task(process(semaphore, i)) for i in dataset]
tasks.append(loop.create_task(other_process(semaphore)))
loop.run_until_complete(asyncio.wait(tasks))

Мы создаем 10 задач, которые используютфункция "process", и та, которая использует "other_process".Тот, который выполняет «other_process», может запускаться только тогда, когда все остальные хранят семафор, и из-за способа переключения контекста Asyncio будет выполняться только функция «other_process», пока остальные находятся в состоянии ожидания, вплоть до«other_process» запускает собственный «await».

$ python3 tmp
0 I'm gonna await
1 I'm gonna await
2 I'm gonna await
3 I'm gonna await
4 I'm gonna await
5 I'm gonna await
6 I'm gonna await
7 I'm gonna await
8 I'm gonna await
9 I'm gonna await
Everyone is awaiting... but I'm not startingr
0 sleeping
1 sleeping
2 sleeping
3 sleeping
4 sleeping
5 sleeping
6 sleeping
7 sleeping
8 sleeping
9 sleeping
Everyone is busy, let's do this!
5 seconds are up, let everyone else play again
0 done sleeping
0 I'm gonna await again
1 done sleeping
1 I'm gonna await again
2 done sleeping
2 I'm gonna await again
3 done sleeping
3 I'm gonna await again
4 done sleeping
4 I'm gonna await again
5 done sleeping
5 I'm gonna await again
6 done sleeping
6 I'm gonna await again
7 done sleeping
7 I'm gonna await again
8 done sleeping
8 I'm gonna await again
9 done sleeping
9 I'm gonna await again
Everyone is awaiting... but I'm not startingr
0 I'm gonna await
1 I'm gonna await
2 I'm gonna await
3 I'm gonna await
4 I'm gonna await
5 I'm gonna await
6 I'm gonna await
7 I'm gonna await
8 I'm gonna await
9 I'm gonna await
Everyone is awaiting... but I'm not startingr
0 sleeping
1 sleeping
2 sleeping
3 sleeping
4 sleeping
5 sleeping
6 sleeping
7 sleeping
8 sleeping
9 sleeping
Everyone is busy, let's do this!
...