Python: получение очереди и отправка сообщений с помощью Flask-socketio. Возможно ли это при использовании функции True True? - PullRequest
0 голосов
/ 19 июня 2019

У меня есть некоторый рабочий код, который получает данные из очереди, обрабатывает их и затем передает данные через Flask-socketio в браузер.

Это работает, когда не так много отправляемых сообщений, однако, когда рабочая нагрузка увеличивается, она просто не справляется.

Кроме того, я попытался обработать очередь без передачи, и, похоже, это работает правильно.

То, что я надеялся сделать, это вместо того, чтобы излучать каждый раз, когда запускается queue.get (), просто излучать любой текущий набор данных на каждом пинге.

Теоретически это должно означать, что даже если queue.get () запускается несколько раз между ping и pong, отправляемые сообщения должны оставаться постоянными и не перегружать систему.

Это, конечно, означает, что некоторые данные не будут отправлены, но самые последние данные по состоянию на последний пинг должны быть отправлены, что достаточно для того, что мне нужно.

Надеюсь, в этом есть смысл, так что (в некотором роде) код ...

Это работает, когда не так много отправляемых сообщений (необходим режим socketio sleep, в противном случае обработка не происходит до того, как он попытается выполнить отправку):

def handle_message(*_args, **_kwargs):

    try:
        order_books = order_queue.get(block=True, timeout=1)
    except Empty:
        order_books = None
        market_books = market_queue.get()

    uo = update_orders(order_books, mkt_runners, profitloss, trading, eo, mb)
    update_market_book(market_books, mb, uo, market_catalogues, mkt_runners, profitloss, trading)
    socketio.sleep(0.2)
    emit('initial_market', {'message': 'initial_market', 'mb': json.dumps(mb), 'ob': json.dumps(eo)})
    socketio.sleep(0.2)
    emit('my_response2', {'message': 'pong'})

def main():
    socketio.run(app, debug=True, port=3000)
    market_stream.stop()
    order_stream.stop()

if __name__ == '__main__':
    main()

Это работает (но я не пытаюсь отправлять здесь какие-либо сообщения, это всего лишь скрипт Python, получаемый из очереди и обрабатывающий):

while True:
    try:
        order_books = order_queue.get(block=True, timeout=1)
    except Empty:
        order_books = None
        market_books = market_queue.get()

    uo = update_orders(order_books, mkt_runners, profitloss, trading, eo, mb)
    update_market_book(market_books, mb, uo, market_catalogues, mkt_runners, profitloss, trading)
    print(mb)

(МБ в конце является текущим набором данных, который возвращается функцией update_market_book).

Теперь я надеялся, что, имея While True в конце, это может просто запуститься, и функция будет возвращать последний набор данных на каждом пинге, однако, используя вышеизложенное, While True срабатывает только тогда, когда main функция отключена ... которая, конечно, останавливает работу секции разъема.

Итак, есть ли способ, которым я могу комбинировать оба этих способа, чтобы достичь того, что я пытаюсь сделать, и / или есть альтернативный метод, который я не рассматривал, который мог бы работать?

Как всегда, я ценю ваш совет, и если вопрос не ясен, пожалуйста, дайте мне знать, чтобы я мог уточнить любые биты.

Большое спасибо!

Просто добавляем трассировку стека в соответствии с запросом:

Traceback (most recent call last):
  File "D:\Python37\lib\site-packages\betfairlightweight\endpoints\login.py", line 38, in request
    response = session.post(self.url, data=self.data, headers=self.client.login_headers, cert=self.client.cert)
  File "D:\Python37\lib\site-packages\requests\api.py", line 116, in post
    return request('post', url, data=data, json=json, **kwargs)
  File "D:\Python37\lib\site-packages\requests\api.py", line 60, in request
    return session.request(method=method, url=url, **kwargs)
  File "D:\Python37\lib\site-packages\requests\sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "D:\Python37\lib\site-packages\requests\sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "D:\Python37\lib\site-packages\requests\adapters.py", line 449, in send
    timeout=timeout
  File "D:\Python37\lib\site-packages\urllib3\connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "D:\Python37\lib\site-packages\urllib3\connectionpool.py", line 343, in _make_request
    self._validate_conn(conn)
  File "D:\Python37\lib\site-packages\urllib3\connectionpool.py", line 839, in _validate_conn
    conn.connect()
  File "D:\Python37\lib\site-packages\urllib3\connection.py", line 332, in connect
    cert_reqs=resolve_cert_reqs(self.cert_reqs),
  File "D:\Python37\lib\site-packages\urllib3\util\ssl_.py", line 279, in create_urllib3_context
    context.options |= options
  File "D:\Python37\lib\ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "D:\Python37\lib\ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "D:\Python37\lib\ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  [Previous line repeated 489 more times]
RecursionError: maximum recursion depth exceeded while calling a Python object
...