создать зависимость между параллельными задачами в Python asyncio - PullRequest
0 голосов
/ 20 февраля 2019

У меня есть две задачи в отношениях потребителя / производителя, разделенные asyncio.Queue.Если задача производителя завершится неудачно, я бы хотел, чтобы задача потребителя также потерпела неудачу как можно скорее и не ожидала в очереди бесконечно.Потребительская задача может быть создана (порождена) независимо от задачи производителя.

В общих чертах, я хотел бы реализовать зависимость между двумя задачами, так что сбой одной из них также является отказом другой.при одновременном сохранении этих двух задач (т. е. одно не будет ожидать другого непосредственно).

Какие решения (например, шаблоны) можно использовать здесь?

Спасибо!

Обновление: в основном, я имею в виду erlang's "links" .

Я думаю, что возможно реализовать что-то подобное с помощью обратных вызовов, например asyncio.Task.add_done_callback

Ответы [ 3 ]

0 голосов
/ 20 февраля 2019

Из комментария:

Я стараюсь избегать поведения, когда потребитель не замечает смерти производителя и ждет в очереди бесконечно.Я хочу, чтобы потребитель был уведомлен о смерти производителя и имел возможность отреагировать.или просто потерпеть неудачу, и даже если он также ожидает в очереди.

Кроме ответа, представленного Игалом, другой способ - создать третью задачу, которая контролирует два, и отменяет один, когда другой заканчивает.Это можно обобщить для любых двух задач:

async def cancel_when_done(source, target):
    assert isinstance(source, asyncio.Task)
    assert isinstance(target, asyncio.Task)
    try:
        await source
    except:
        # SOURCE is a task which we expect to be awaited by someone else
        pass
    target.cancel()

Теперь при настройке производителя и потребителя вы можете связать их с помощью вышеуказанной функции.Например:

async def producer(q):
    for i in itertools.count():
        await q.put(i)
        await asyncio.sleep(.2)
        if i == 7:
            1/0

async def consumer(q):
    while True:
        val = await q.get()
        print('got', val)

async def main():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    p = loop.create_task(producer(queue))
    c = loop.create_task(consumer(queue))
    loop.create_task(cancel_when_done(p, c))
    await asyncio.gather(p, c)

asyncio.get_event_loop().run_until_complete(main())
0 голосов
/ 21 февраля 2019

Другое возможное решение:

import asyncio
def link_tasks(t1: Union[asyncio.Task, asyncio.Future], t2: Union[asyncio.Task, asyncio.Future]):
    """
    Link the fate of two asyncio tasks,
    such that the failure or cancellation of one
    triggers the cancellation of the other
    """
    def done_callback(other: asyncio.Task, t: asyncio.Task):
        # TODO: log cancellation due to link propagation
        if t.cancelled():
            other.cancel()
        elif t.exception():
            other.cancel()
    t1.add_done_callback(functools.partial(done_callback, t2))
    t2.add_done_callback(functools.partial(done_callback, t1))

При этом используется asyncio.Task.add_done_callback для регистрации обратных вызовов, которые отменят другую задачу в случае сбоя или отмены одной из них.

0 голосов
/ 20 февраля 2019

Одним из способов будет распространение исключения через очередь в сочетании с делегированием обработки работы:

class ValidWorkLoad:
    async def do_work(self, handler):
        await handler(self)


class HellBrokeLoose:
    def __init__(self, exception):
        self._exception = exception

    async def do_work(self, handler):
        raise self._exception


async def worker(name, queue):
    async def handler(work_load):
        print(f'{name} handled')

    while True:
        next_work = await queue.get()
        try:
            await next_work.do_work(handler)
        except Exception as e:
            print(f'{name} caught exception: {type(e)}: {e}')
            break
        finally:
            queue.task_done()


async def producer(name, queue):
    i = 0
    while True:
        try:
            # Produce some work, or fail while trying
            new_work = ValidWorkLoad()
            i += 1
            if i % 3 == 0:
                raise ValueError(i)
            await queue.put(new_work)
            print(f'{name} produced')
            await asyncio.sleep(0)  # Preempt just for the sake of the example
        except Exception as e:
            print('Exception occurred')
            await queue.put(HellBrokeLoose(e))
            break


loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = producer('Producer', queue)
consumer_coro = worker('Consumer', queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()

Какие выходные данные:

Произведено производителем

Обработано потребителем

Произведено производителем

Обработано потребителем

Произошло исключение

Исключение получено потребителем:: 3

В качестве альтернативы вы можете пропустить делегирование и назначить элемент, который сигнализирует работнику об остановке.При обнаружении исключения в источнике вы помещаете этот назначенный элемент в очередь.

...