Длительная задержка в использовании asyncio и websockets в Python 3 - PullRequest
0 голосов
/ 24 сентября 2018

Я испытываю длительную (3 часа) задержку (EDIT: сначала задержка короткая, а затем увеличивается в течение дня) при обработке данных, передаваемых с сервера веб-сокетов на мой client.py.Я знаю, что он не задерживается сервером.

Например, каждые 5 секунд я вижу событие keep_alive и соответствующую временную метку.Так что все идет гладко. Но когда я вижу, что обработанный в журналах фрейм данных фактически равен 3 часам после , когда сервер отправил его. Я что-то делаю, чтобы отложить этот процесс?

Правильно ли я называю свою сопрограмму keep_alive?keep_alive - это просто сообщение серверу, чтобы поддерживать соединение живым.Сервер возвращает сообщение обратно.Кроме того, я вхожу слишком много?Может ли это задерживать обработку (я так не думаю, так как вижу, что события регистрации происходят сразу).

async def keep_alive(websocket):
                """
                 This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
                """
                await websocket.send('Hello')   
                await asyncio.sleep(5)

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    async with websockets.connect( 
            'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
        while True:    
            """
            Handle message from server.
            """
            message = await websocket.recv()
            if message.isdigit():
                # now = datetime.datetime.now()
                rotating_logger.info ('Keep alive message: {}'.format(str(message)))
            else:
                jasonified_message = json.loads(message)
                for key in jasonified_message:
                    rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))    
                """
                Store in a csv file.
                """
                try:            
                    convert_and_store(jasonified_message)
                except PermissionError:
                    convert_and_store(jasonified_message, divert = True)                        
            """
            Keep connection alive.
            """            
            await keep_alive(websocket)

"""
Logs any exceptions in logs file.
"""
try:
    asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
    rotating_logger.info (e)

РЕДАКТИРОВАТЬ: Из документации - я думаю, что это может иметь какое-то отношение к нему - но я не подключил точки.

Параметр max_queue устанавливает максимальную длину очереди, в которой хранятся входящие сообщения.Значение по умолчанию - 32. 0 отключает ограничение.Сообщения добавляются в очередь в памяти при получении;затем recv () выскакивает из этой очереди.Чтобы предотвратить чрезмерное потребление памяти, когда сообщения принимаются быстрее, чем они могут быть обработаны, очередь должна быть ограничена.Если очередь заполняется, протокол прекращает обработку входящих данных, пока не будет вызвана функция recv ().В этой ситуации различные приемные буферы (по крайней мере, в asyncio и в ОС) будут заполняться, затем окно приема TCP будет сокращаться, замедляя передачу, чтобы избежать потери пакетов.

РЕДАКТИРОВАТЬ 9/28/ 2018: я тестирую его без сообщения keep-alive, и это, похоже, не проблема.Может ли это быть связано с функцией convert_and_store ()?Должен ли это быть async def и затем также ожидаться?

def convert_and_store(data, divert = False, test = False):
    if test:
        data = b
    fields = data.keys()
    file_name =  parse_call_type(data, divert = divert)
    json_to_csv(data, file_name, fields)

РЕДАКТИРОВАТЬ 01.10.2008: кажется, что оба сообщения keep-alive и convert_and_store являются предметом спора;если я расширю сообщение keep-alive до 60 секунд - то convert_and_store будет запускаться только один раз в 60 секунд .Итак, convert_and_store ожидает keep_alive () ...

Ответы [ 3 ]

0 голосов
/ 30 сентября 2018

Я думаю, это было бы более понятно, используйте ThreadPoolExecutor, чтобы код блокировки работал в фоновом режиме

from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=4)

def convert_and_store(data, divert=False, test=False):
    loop = asyncio.get_event_loop()
    loop.run_in_executor(pool, _convert_and_store, divert, test)


def _convert_and_store(data, divert=False, test=False):
    if test:
        data = b
    fields = data.keys()
    file_name = parse_call_type(data, divert=divert)
    json_to_csv(data, file_name, fields)

asyncio send keep keep msg demo

async def kepp_alive(websocket):
    while True:
        await websocket.send_str(ping)
        await asyncio.sleep(10)
0 голосов
/ 02 октября 2018

Вы должны начать новый поток для этой функции keep_alive().

Для async-await обещает, что все задачи были выполнены, прежде чем перейти к следующему шагу.

Таким образом, await keep_alive(websocket) фактически блокирует поток в этом смысле.Вы можете не ждать здесь keep_alive, так что процесс может продолжаться, но наверняка это не то, что вы хотите, я уверен.

На самом деле, вам нужны два периода времени, один для общения ссервер, один для поддержки сервера.Они должны быть разделены, так как они находятся в разных сопрограммах.

Таким образом, правильный способ - использовать Thread, и в этом случае не нужно использовать asyncio, для простоты.

Сначала измените keep_alive() на следующее.

def keep_alive():
    """
        This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
    """
    while True:
        websocket.send('Hello') 
        time.sleep(1)

In open_connection_test()

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    thread = threading.Thread(target=keep_alive, args=())
    thread.daemon = True   # Daemonize
    thread.start()
    async with websockets.connect(...) as websocket:
        ....
        #No need this line anymore.
        #await keep_alive(websocket) 
0 голосов
/ 30 сентября 2018

Может ли это быть связано с функцией convert_and_store ()?

Да, это может быть.Код блокировки не должен вызываться напрямую.Если функция выполняет вычисление с интенсивным использованием ЦП в течение 1 секунды, все асинхронные задачи и операции ввода-вывода будут задержаны на 1 секунду.

Исполнитель может использоваться для запуска кода блокировки в другом потоке / процессе:

import asyncio
import concurrent.futures
import time

def long_runned_job(x):
    time.sleep(2)
    print("Done ", x)

async def test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        for i in range(5):
            loop.run_in_executor(pool, long_runned_job, i)
            print(i, " is runned")
            await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

В вашем случае это должно выглядеть примерно так:

import concurrent.futures

async def open_connection_test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        async with websockets.connect(...) as websocket:
            while True:    
                ...
                loop.run_in_executor(pool, convert_and_store, args)

EDITED :

Кажется, что оба-alive сообщение и convert_and_store оба в вопросе

Вы можете запустить keep_alive в фоновом режиме:

async def keep_alive(ws):
    while ws.open:
        await ws.ping(...)   
        await asyncio.sleep(...)

async with websockets.connect(...) as websocket:
    asyncio.ensure_future(keep_alive(websocket))
    while True:    
        ...
...