Как использовать asyncio модуль вызова службы nameko одновременно - PullRequest
0 голосов
/ 26 января 2019

Я пишу программу asyncio, как это.Вечный цикл запускает 4 события одновременно.Каждое событие будет запускать сервис rpc.В сервисе nameko я внедряю сервис с time.sleep(10).

. Я запутался, почему сервис завершается каждые 10 секунд.Я думаю, что обслуживание должно закончиться в то же время.Как я могу позволить завершить работу одновременно?

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def job(x):
    try:
        with ClusterRpcProxy(CONFIG) as rpc:
            res = rpc.helloworldService.helloworld(x)
            print(res)
    except Exception as e:
        print(f"{e}")


async def do_sleep(x, queue):
        try:
             await job(x)
             queue.put("ok")
        except Exception as e:
            print(f"{e}")


def consumer():
    asyncio.run_coroutine_threadsafe(do_sleep('10', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('11', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('12', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('13', queue), new_loop)


if __name__ == '__main__':
    print(time.ctime())
    new_loop = asyncio.new_event_loop()

    loop_thread = Thread(target=start_loop, args=(new_loop,))
    loop_thread.setDaemon(True)
    loop_thread.start()

    CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
    queue = Queue()
    sema = asyncio.Semaphore(2)

    consumer_thread = Thread(target=consumer)
    consumer_thread.setDaemon(True)
    consumer_thread.start()

    while True:
        msg = queue.get()
        print("current:", time.ctime())

Служба nameko rpc:

class HelloWorld:
    name = 'helloworldService'

    @rpc
    def helloworld(self,str):
        time.sleep(10)
        return 'hello_'+str

И вывод следующий:

hello_10
current: Sat Jan 26 13:04:57 2019
hello_11
current: Sat Jan 26 13:05:07 2019
hello_12
current: Sat Jan 26 13:05:17 2019
hello_13
current: Sat Jan 26 13:05:28 2019

1 Ответ

0 голосов
/ 26 января 2019

Вы должны использовать ожидаемый сон вместо нежелательного time.sleep(). Таким образом, ваш nameko RPC сервис будет выглядеть следующим образом:

import asyncio

class HelloWorld:
    name = 'helloworldService'

    @rpc
    async def helloworld(self,str):  # Note
        await asyncio.sleep(10)  # Note
        return 'hello_'+str

И кусок кода вашего сервера:

async def job(x):
    try:
        with ClusterRpcProxy(CONFIG) as rpc:
            res = await rpc.helloworldService.helloworld(x)  # Note
            print(res)
    except Exception as e:
        print(f"{e}")

[Примечание]

  • Но ваша библиотека RPC должна быть реализована также asyncio.
  • Это библиотека Async asyncio RPC ( aiorpc ).
...