Как я могу избежать ошибок времени исполнения каналов django при использовании сигналов? - PullRequest
0 голосов
/ 12 декабря 2018

У меня есть служба django (скажем, 'brisbane'), которая отправляет обновления в браузеры клиентов, когда модели баз данных сохраняются с использованием каналов и сигналов, таким образом:

def invalidated_serialized_model_receiver(self, sender, **kwargs):
    ...
    async_to_sync(get_channel_layer().group_send)(name, update)

Это работает и дает хорошие обновления в реальном времени.

'brisbane' теперь необходимо взаимодействовать с другим сервисом (тот же код) 'sydney', чтобы его можно было аналогичным образом обновлять в режиме реального времени в отношении изменений в sydney's данных.При этом используется потребитель, работающий в другом процессе, который выглядит примерно так:

async def remote_site_consume(site):
    socket_url = site.urn(protocol='ws', resource_name='/ws/watch/')
    async with websockets.connect(socket_url) as websocket:
        await websocket.send(json.dumps({'type': 'watch_messages'}))
        ...
        async for event in websocket:
            event = json.loads(event)
            await get_event_handler(event)(site, websocket, event)

Сигнал может быть отправлен законным образом из обработчика событий, в котором и возникает проблема.Когда это происходит, выдается RuntimeError

'Вы не можете использовать AsyncToSync в том же потоке, что и цикл асинхронных событий - просто ждите асинхронную функцию напрямую.'

Iне могу просто использовать await, потому что сигнал также посылается из потоков без цикла обработки событий.

1 Ответ

0 голосов
/ 12 декабря 2018

Я пытаюсь с помощью этой замены async_to_sync, которая, кажется, работает в локальных тестах, по крайней мере:

def invalidated_serialized_model_receiver(self, sender, **kwargs):
    ...
    create_or_use_loop(self.channel_layer.group_send(name, update))


def create_or_use_loop(awaitable):
    try:
        event_loop = asyncio.get_event_loop()
    except RuntimeError:
        event_loop = None

    if event_loop and event_loop.is_running():
        event_loop.call_soon_threadsafe(event_loop.create_task, awaitable)
    else:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            loop.run_until_complete(awaitable)
        finally:
            loop.close()
            asyncio.set_event_loop(event_loop)

Это выглядит очень неуклюже.

...