pika для сбоя rabbitMQ при использовании флеш-сервера - PullRequest
0 голосов
/ 30 декабря 2018

Итак, у нас работает однопотоковый колб-сервер, на который мы получаем запросы от клиента приложения Python.В этом флеш-сервере мы используем rabbitMQ с библиотекой pika для рассылки сообщений другим клиентам.Происходит следующее: в функции get происходит сбой программы с ошибкой:

pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - ожидаемый заголовок содержимого для класса 60, полученный кадр заголовка без содержимоговместо этого ')

Я искал много тем об этом в переполнении стека и других, но все они решают проблемы с многопоточностью, которая не имеет место.Flask должен работать только с одним потоком, если он не вызывается в app.run (threadaded = yes).

Программа обычно дает сбой, когда несколько сообщений отправляются за короткий промежуток времени (например, 5 в секунду), и это также важноОтметим, что сообщения поступают каждую секунду с запросом к этой функции:

@app.route('/api/users/getMessages', methods=['POST'])  
def get_Messages():  
    data = json.loads(request.data)
    token = data['token']

    payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
    istid = payload['istid']
    print('istid: '+istid)

    messages = []

    queue = channel.queue_declare(queue=istid)
    for i in range(queue.method.message_count):
        method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
        if method_frame:
            #print(method_frame, header_frame, body)
            messages.append(body)
        else:
            print('No message returned')

    res = {'messages':messages, 'error':0}
    return jsonify(res)

В этом коде обычно происходит сбой в строке:

queue = channel.queue_declare(queue=istid)

Но мы также пытались изменитькод, который нужно использовать вместо вместо для того, где он заканчивается, когда тело имеет значение None, и оно вылетает в строке:
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True) в этом случае.Также важно, что сбои являются случайными, и это может сработать несколько раз, а затем происходит случайный сбой после получения запроса во время отправки сообщений.Если кто-нибудь знает что-либо, связанное с этим, мы будем благодарны за любую помощь.

Еще одно замечание: мы подумали об использовании basic_consume с обратным вызовом вместо basic_get, но мы не нашли способ, как это будет работать, поскольку нам приходится отправлятьсообщения возвращаются, и несколько пользователей отправляют запросы на эту же функцию.

РЕДАКТИРОВАТЬ # 1: В документах rabbitMQ rabbitmq при поиске функции "def basic_get"«Вы заметите, что есть некоторые комментарии TODO, а также ссылка на этот

Из-за деталей реализации, это не может быть вызвано во второй раз, пока не будет выполнен обратный вызов.

Так что я подозревал, что это может быть тем, что происходит, но даже если это так, я не знаю, как это можно решить.

1 Ответ

0 голосов
/ 01 января 2019

Для всех, кто интересуется решением, так же как и в других комментариях, программа не была поточно-ориентированной, поскольку во флаке версии 1.0 по умолчанию используется threadaded = True.
Решение может быть либо:
1)запуск фляги с app.run (threadaded = False)
2) Обеспечение безопасности потока программы путем реализации блокировок при доступе к каналу / соединению с помощью pika.

...