Как собрать результаты и использовать лимит с родительскими дочерними функциями - PullRequest
0 голосов
/ 21 сентября 2019

То, чего я пытаюсь добиться, - это что-то вроде порождения нескольких родителей, и каждый родитель выполняет определенную работу, а затем порождает нескольких детей, чтобы найти другие вещи и взять эти результаты у родителя, чтобы выполнить дальнейшую работу.Я также пытался установить 2 различных предела появления, потому что родительская работа может делать больше, чем дочерние.

Как бы я это сделал?

Это работает, если я не использую limit2, ноЯ хотел бы иметь два ограничителя.

import trio
import asks
import time
import random

async def child(parent, i, sender, limit2):
    async with limit2:
        print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
        #await trio.sleep(random.randrange(0, 3))
        print('Parent {0}, Child {1}: exiting!'.format(parent, i))
        async with sender:
            await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))

async def parent(i, limit):
    async with limit:
        print('Parent {0}: started! Sleeping now...'.format(i))
        #await trio.sleep(random.randrange(0, 3))

        sender, receiver = trio.open_memory_channel(10)
        limit2 = trio.CapacityLimiter(2)
        async with trio.open_nursery() as nursery:
            for j in range(10):
                nursery.start_soon(child, i, j, sender, limit2)

        async with receiver:
            async for value in receiver:
                print('Got value: {!r}'.format(value))
        print('Parent {0}: exiting!'.format(i))

async def main():
    limit = trio.CapacityLimiter(1)
    async with trio.open_nursery() as nursery:
        for i in range(1):
            nursery.start_soon(parent, i, limit)


if __name__ == "__main__":
    start_time = time.perf_counter()
    trio.run(main)
    duration = time.perf_counter() - start_time
    print("Took {:.2f} seconds".format(duration))

1 Ответ

0 голосов
/ 23 сентября 2019

Когда я запускаю ваш код, я получаю:

  File "/tmp/zigb.py", line 12, in child
    await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
  File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 157, in send
    self.send_nowait(value)
  File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_core/_ki.py", line 167, in wrapper
    return fn(*args, **kwargs)
  File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 135, in send_nowait
    raise trio.ClosedResourceError
trio.ClosedResourceError

Здесь происходит то, что вы передаете канал sender во все 10 дочерних задач, а затем каждая дочерняя задача выполняетasync with sender: ..., который закрывает канал sender.Итак, сначала одно задание использует его, а затем закрывает, а затем следующее задание пытается его использовать ... но оно уже закрыто, поэтому возникает ошибка.

К счастью, Trio предлагает решение для точногоэта проблема: вы можете использовать метод clone для объекта канала памяти, чтобы создать вторую копию этого канала памяти, которая работает точно так же, но закрывается независимо.Таким образом, хитрость заключается в том, чтобы передать каждому дочернему элементу клон sender, а затем каждый из них закрывает свой клон, а затем, как только все клоны закрываются, получатель получает уведомление и прекращает цикл.

Документы:https://trio.readthedocs.io/en/stable/reference-core.html#managing-multiple-producers-and-or-multiple-consumers

Исправлена ​​версия вашего кода:

import trio
import asks
import time
import random

async def child(parent, i, sender, limit2):
    async with limit2:
        print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
        #await trio.sleep(random.randrange(0, 3))
        print('Parent {0}, Child {1}: exiting!'.format(parent, i))
        async with sender:
            await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))

async def parent(i, limit):
    async with limit:
        print('Parent {0}: started! Sleeping now...'.format(i))
        #await trio.sleep(random.randrange(0, 3))

        sender, receiver = trio.open_memory_channel(10)
        limit2 = trio.CapacityLimiter(2)
        async with trio.open_nursery() as nursery:
            for j in range(10):
                # CHANGED: Give each child its own clone of 'sender', which
                # it will close when it's done
                nursery.start_soon(child, i, j, sender.clone(), limit2)
        # CHANGED: Close the original 'sender', once we're done making clones
        await sender.aclose()

        async with receiver:
            async for value in receiver:
                print('Got value: {!r}'.format(value))
        print('Parent {0}: exiting!'.format(i))

async def main():
    limit = trio.CapacityLimiter(1)
    async with trio.open_nursery() as nursery:
        for i in range(1):
            nursery.start_soon(parent, i, limit)


if __name__ == "__main__":
    start_time = time.perf_counter()
    trio.run(main)
    duration = time.perf_counter() - start_time
    print("Took {:.2f} seconds".format(duration))
...