Как написать свою собственную функцию асинхронной / ожидаемой сопрограммы в Python? - PullRequest
3 голосов
/ 07 октября 2019

Я пытаюсь написать свою собственную функцию awaiatbale, которая могла бы использовать в асинхронном цикле, таком как метод asyncio.sleep() или что-то вроде этих предварительно ожидаемых реализованных методов.

Вот что я сделал до сих пор:

import asyncio

def coro1():
    for i in range(1, 10):
        yield i

def coro2():
    for i in range(1, 10):
        yield i*10

class Coro:  # Not used.
    def __await__(self):
        for i in range(1, 10):
            yield i * 100

@asyncio.coroutine
def wrapper1():
    return (yield from coro1())

@asyncio.coroutine
def wrapper2():
    return (yield from coro2())

for i in wrapper1():
    print(i)

print("Above result was obvious which I can iterate around a couroutine.".center(80, "#"))

async def async_wrapper():
    await wrapper1()
    await wrapper2()

loop = asyncio.get_event_loop()
futures = [asyncio.ensure_future(async_wrapper())]
result = loop.run_until_complete(asyncio.gather(*futures))
print(result)

loop.close()

Что я получил в результате:

1
2
3
4
5
6
7
8
9
#######Above result was obvious which I can iterate around a couroutine.#########
Traceback (most recent call last):
  File "stack-coroutine.py", line 36, in <module>
    result = loop.run_until_complete(asyncio.gather(*futures))
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "stack-coroutine.py", line 30, in async_wrapper
    await wrapper1()
  File "stack-coroutine.py", line 18, in wrapper1
    return (yield from coro1())
  File "stack-coroutine.py", line 5, in coro1
    yield i
RuntimeError: Task got bad yield: 1

Что я ожидаю в результате:

1
10
2
20
3
30
.
.
.

[ ПРИМЕЧАНИЕ]:

  • Я не ищу многопоточный или многопроцессорный метод.
  • Этот Вопрос почти аналогичен моему вопросу, который не был решенпока.
  • Я использую Python3.6

Ответы [ 2 ]

1 голос
/ 07 октября 2019

Обычно вам не нужно писать сопрограммы низкого уровня, используя async def, и ожидание внутри него является обычным способом достижения вашей цели.

Однако, если вы заинтересованы в деталях реализации, вот исходный код из asyncio.sleep().

Подобно многим другим низкоуровневым асинхронным функциям, для реализации сопрограммы используются 3 основных момента:

  • asyncio.Future () - "мост" между callbacks-world и coroutines-world
  • метод loop.call_later() цикла событий - включение из нескольких методов цикла событий, напрямую сообщающих циклу событий, когдасделать что-то
  • async def и await - просто синтаксический сахар для @asyncio.coroutine и yield from, который позволяет привести некоторую функцию к генератору (и выполнить ее "один шаг за раз ")

Вот моя грубая реализация сна, которая демонстрирует идею:

import asyncio


# @asyncio.coroutine - for better tracebacks and edge cases, we can avoid it here
def my_sleep(delay):
    fut = asyncio.Future()

    loop = asyncio.get_event_loop()
    loop.call_later(
        delay,
        lambda *_: fut.set_result(True)
    )

    res = yield from fut
    return res


# Test:
@asyncio.coroutine
def main():
    yield from my_sleep(3)
    print('ok')


asyncio.run(main())

Если вы хотите пойти ниже этого уровня, выдолжны понимать, как генераторы (или сопрограммы) управляются циклом событий, Видео , упомянутое пользователем 4815162342 - это хорошее место для начала.

Но, опять же, все вышеизложенное - это детали реализации. Вам не нужно думать обо всем этом, если вы не пишете что-то очень-очень низкое.

0 голосов
/ 02 ноября 2019

Я обнаружил параллельный / асинхронный подход с использованием генераторов. Однако, это не asyncio подход:

from collections import deque

def coro1():
    for i in range(1, 5):
        yield i

def coro2():
    for i in range(1, 5):
        yield i*10

print('Async behaviour using default list with O(n)'.center(60, '#'))
tasks = list()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.pop(0)
    try:
        print(next(task))
        tasks.append(task)
    except StopIteration:
        pass

print('Async behaviour using deque with O(1)'.center(60, '#'))
tasks = deque()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.popleft()  # select and remove a task (coro1/coro2).
    try:
        print(next(task))
        tasks.append(task)  # add the removed task (coro1/coro2) for permutation.
    except StopIteration:
        pass

Out:

########Async behaviour using default list with O(n)########
1
10
2
20
3
30
4
40
###########Async behaviour using deque with O(1)############
1
10
2
20
3
30
4
40

[ОБНОВЛЕНИЕ]:

Наконец, я решилэтот пример через синтаксис AsyncIO:

import asyncio

async def coro1():
    for i in range(1, 6):
        print(i)
        await asyncio.sleep(0)  # switches task every one iteration.

async def coro2():
    for i in range(1, 6):
        print(i * 10)
        await asyncio.sleep(0)  # switches task every one iteration.

loop = asyncio.get_event_loop()
futures = [
    asyncio.ensure_future(coro1()),
    asyncio.ensure_future(coro2())
]
loop.run_until_complete(asyncio.gather(*futures))
loop.close()

Out:

1
10
2
20
3
30
4
40
5
50

И другой сопрограммный метод параллелизма через выражение async-await и менеджер цикла событий на основе Очередь кучи * Алгоритм 1021 * , без использования библиотеки asyncio и ее цикла обработки событий и без asyncio.sleep() метод:

import heapq
from time import sleep
from datetime import datetime, timedelta

class Sleep:
    def __init__(self, seconds):
        self.sleep_until = datetime.now() + timedelta(seconds=seconds)

    def __await__(self):
        yield self.sleep_until

async def coro1():
    for i in range(1, 6):
        await Sleep(0)
        print(i)

async def coro2():
    for i in range(1, 6):
        await Sleep(0)
        print(i * 10)

def coro_manager(*coros):
    coros = [(datetime.now(), coro) for coro in coros]
    heapq.heapify(coros)
    while coros:
        exec_at, coro = heapq.heappop(coros)
        if exec_at > datetime.now():
            sleep((exec_at - datetime.now()).total_seconds())
        try:
            heapq.heappush(coros, (coro.send(None), coro))
        except StopIteration:
            try:
                coros.remove(coro)
            except ValueError:
                pass

coro_manager(coro1(), coro2())

Out:

1
10
2
20
3
30
4
40
5
50
...