Обновление контекста на сервере Sani c с клиентом aiohttp - PullRequest
0 голосов
/ 24 марта 2020

Я пытаюсь разработать веб-сервер с использованием Sani c. Тем временем я хочу иметь возможность использовать контекст с помощью клиента aiohttp. По некоторым причинам мне не удается заставить все работать.

Я следовал некоторым выводам из здесь . Однако со следующим кодом:

import os
from multiprocessing import Queue, Process

import aiohttp
from sanic import Sanic
from sanic.request import Request
from sanic.response import json, HTTPResponse

import pprint

import asyncio


class Client:
    @classmethod
    async def create(cls, loop):
        self = Client()

        ssl = os.getenv("DEBUG", "True") == "True"
        self._tcp_connector = aiohttp.TCPConnector(verify_ssl=ssl)
        self._client_session = aiohttp.ClientSession(
            connector=self._tcp_connector, loop=loop
        )  # Must call self._client_session.close() at the end

        return self

    async def get(self, session, url):
        async with self._client_session.get(url) as response:
            return await response.json()

    async def fetch_cats_facts(self):
        res = await self.get(
            self._client_session, "https://cat-fact.herokuapp.com/facts"
        )
        return res


class Server:
    def __init__(self):
        self._app = Sanic(__name__)

        @self._app.listener("before_server_start")
        async def before_server_start(app, loop):
            self._client = await Client.create(loop)
            self._app.add_task(self.refresh_context())

        @self._app.route("/v1/init")
        def init(request: Request):
            return json({"hey": "you"})

    async def refresh_context(self):
        def wrapper(q: Queue):
            async def refresh_facts():
                facts = await self._client.fetch_cats_facts()
                q.put(models)

            asyncio.run(refresh_facts())

        while True:
            q = Queue()
            p = Process(target=wrapper, args=(q,))
            p.start()

            pprint.pprint(q.get())

            p.join()
            await asyncio.sleep(REFRESH_PERIOD_SECONDS)

    def start(self):
        # Do not change the worker number. Since it's not possible to simply share a state between sanic processes, I omit this for now
        self._app.run(workers=1, port=8080)


if __name__ == "__main__":
    server = Server()

    server.start()

Я получаю следующую ошибку:

RuntimeError: Task <Task pending coro=<Server.refresh_context.<locals>.wrapper.<locals>.refresh_facts() running at test.py:54> cb=[run_until_complete.<locals>.<lambda>()]> got Future <Future pending> attached to a different loop

Чего мне не хватает? Я передаю приложение Sani c l oop клиенту для создания ClientSession, поэтому не понимаю, почему оно подключено к другому l oop. Я подозреваю нежелательное поведение при создании Процесса. Но это единственный способ, с помощью которого я нашел sh контекст приложения в фоновом режиме.

Спасибо за вашу помощь.

Алексис

РЕДАКТИРОВАТЬ 1: asyncio.run создает новый l oop, поэтому l oop в новом процессе отличается от сеанса клиента. Поэтому мне удалось заставить все работать с:

async def refresh_facts():
                loop = asyncio.get_running_loop()
                client = await Client.create(loop)
                facts = await client.fetch_cats_facts()
                q.put(facts)
asyncio.run(refresh_facts())

Однако выборка замедляет ответ моего сервера, в то время как вся идея использования Процесса состояла в том, чтобы обновить sh контекст в фоновом режиме. Чего мне не хватает?

...