Я испытываю длительную (3 часа) задержку (EDIT: сначала задержка короткая, а затем увеличивается в течение дня) при обработке данных, передаваемых с сервера веб-сокетов на мой client.py.Я знаю, что он не задерживается сервером.
Например, каждые 5 секунд я вижу событие keep_alive и соответствующую временную метку.Так что все идет гладко. Но когда я вижу, что обработанный в журналах фрейм данных фактически равен 3 часам после , когда сервер отправил его. Я что-то делаю, чтобы отложить этот процесс?
Правильно ли я называю свою сопрограмму keep_alive?keep_alive - это просто сообщение серверу, чтобы поддерживать соединение живым.Сервер возвращает сообщение обратно.Кроме того, я вхожу слишком много?Может ли это задерживать обработку (я так не думаю, так как вижу, что события регистрации происходят сразу).
async def keep_alive(websocket):
"""
This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
"""
await websocket.send('Hello')
await asyncio.sleep(5)
async def open_connection_test():
"""
Establishes web socket (WSS). Receives data and then stores in csv.
"""
async with websockets.connect(
'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
while True:
"""
Handle message from server.
"""
message = await websocket.recv()
if message.isdigit():
# now = datetime.datetime.now()
rotating_logger.info ('Keep alive message: {}'.format(str(message)))
else:
jasonified_message = json.loads(message)
for key in jasonified_message:
rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))
"""
Store in a csv file.
"""
try:
convert_and_store(jasonified_message)
except PermissionError:
convert_and_store(jasonified_message, divert = True)
"""
Keep connection alive.
"""
await keep_alive(websocket)
"""
Logs any exceptions in logs file.
"""
try:
asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
rotating_logger.info (e)
РЕДАКТИРОВАТЬ: Из документации - я думаю, что это может иметь какое-то отношение к нему - но я не подключил точки.
Параметр max_queue устанавливает максимальную длину очереди, в которой хранятся входящие сообщения.Значение по умолчанию - 32. 0 отключает ограничение.Сообщения добавляются в очередь в памяти при получении;затем recv () выскакивает из этой очереди.Чтобы предотвратить чрезмерное потребление памяти, когда сообщения принимаются быстрее, чем они могут быть обработаны, очередь должна быть ограничена.Если очередь заполняется, протокол прекращает обработку входящих данных, пока не будет вызвана функция recv ().В этой ситуации различные приемные буферы (по крайней мере, в asyncio и в ОС) будут заполняться, затем окно приема TCP будет сокращаться, замедляя передачу, чтобы избежать потери пакетов.
РЕДАКТИРОВАТЬ 9/28/ 2018: я тестирую его без сообщения keep-alive, и это, похоже, не проблема.Может ли это быть связано с функцией convert_and_store ()?Должен ли это быть async def и затем также ожидаться?
def convert_and_store(data, divert = False, test = False):
if test:
data = b
fields = data.keys()
file_name = parse_call_type(data, divert = divert)
json_to_csv(data, file_name, fields)
РЕДАКТИРОВАТЬ 01.10.2008: кажется, что оба сообщения keep-alive и convert_and_store являются предметом спора;если я расширю сообщение keep-alive до 60 секунд - то convert_and_store будет запускаться только один раз в 60 секунд .Итак, convert_and_store ожидает keep_alive () ...