Фэй-паутина держится до конца блока - PullRequest
1 голос
/ 15 января 2020

Когда я отправляю сообщения через сервер faye-websocket в качестве промежуточного программного обеспечения, я обнаруживаю, что сообщение не отправляется до тех пор, пока не закончится блок.

Вот несколько примеров, которые я пробовал:

Нет потоков

require 'faye/websocket'
require 'eventmachine'
require 'json'

Faye::WebSocket.load_adapter('thin')

module SocketTest
  class Websocket

    def initialize(app)
      @app     = app
    end

    def long_function()
      sleep 20
      "foo"
    end    

    def call(env)
      EM.run {
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: 15 })
          ws.on :open do |event|
            response = {
              :responseCode => 100,
              :message => "Connection opened"
            }
            $logger.info "< #{response.to_json}"
            ws.send(response.to_json)
          end

          ws.on :message do |event|
            response = {
              :responseCode => 100,
              :message => "Received request, running slow function"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            long_function_result = long_function()
            response = {
              :responseCode => 200,
              :message => "Long function ran, result is #{long_function_result}"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            ws.close()
          end
          ws.on :close do |event|
            $logger.info "< CLOSE"
            ws = nil
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end
      }
    end    
  end
end

Chrome выход

5:25:09pm   WebSocket Connection Established
5:25:09pm   {"request":"test"}
5:25:09pm   {"responseCode":100,"message":"Connection opened"}
5:25:30pm   {"responseCode":100,"message":"Received request, running slow function"}
5:25:30pm   {"responseCode":200,"message":"Long function ran, result is foo"}
5:25:30pm   Connection Close Frame
5:25:30pm   Connection Close Frame

Консольный вывод

I, [2020-01-15T17:25:10.006006 #25394]  INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:25:10.024170 #25394]  INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:25:30.034885 #25394]  INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}I, [2020-01-15T06:25:30.189606 #25394]  INFO -- : < CLOSE

Как мы Вы можете видеть выше, что вывод логгера является немедленным (17:25:09) для «Полученного запроса, медленная функция», однако дополнительные 20 секунд тратятся на отправку ответа клиенту. Я заметил то же самое с сообщениями PING / PONG - с сервера WebSocket ничего не приходит, пока не завершится блокировка sleep.

Я также попробовал модифицированную версию, подобную этой:

Отрезание резьбы

# Just the call function, all other parts are the same
    def call(env)
      EM.run {
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: 15 })
          ws.on :open do |event|
            response = {
              :responseCode => 100,
              :message => "Connection opened"
            }
            $logger.info "< #{response.to_json}"
            ws.send(response.to_json)
          end

          ws.on :message do |event|
            response = {
              :responseCode => 100,
              :message => "Received request, running slow function"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            long_function = Thread.new { long_function() }
            until (long_function.alive? == false) do
              response = {
                :responseCode => 100,
                :message => "Waiting for long function to complete"
              }
              ws.send(response.to_json)
              $logger.info "< #{response.to_json}"
              sleep 5

            end
            response = {
              :responseCode => 200,
              :message => "Long function ran, result is #{long_function.value}"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            ws.close()
          end
          ws.on :close do |event|
            $logger.info "< CLOSE"
            ws = nil
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end
      }
    end    

Chrome выход

5:34:33pm   WebSocket Connection Established
5:34:33pm   {"request":"test"}
5:34:33pm   {"responseCode":100,"message":"Connection opened"}
5:34:53pm   {"responseCode":100,"message":"Received request, running slow function"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":200,"message":"Long function ran, result is foo"}
5:34:53pm   Connection Close Frame
5:34:53pm   Connection Close Frame

Консольный вывод

I, [2020-01-15T17:34:33.473295 #25729]  INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:34:33.489433 #25729]  INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:34:33.489638 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:38.490995 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:43.491927 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:48.497281 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:53.499446 #25729]  INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}
I, [2020-01-15T17:34:53.632997 #25729]  INFO -- : < CLOSE

Результат опять то же самое, хотя объект $logger продолжает работать, ожидая результата потока (указывая на то, что мы больше не блокируем), кажется, что WebSocket все еще удерживает и ждет своего буфера. Как я могу форсировать этот буфер?

1 Ответ

1 голос
/ 20 января 2020

Используя функцию add_timer EventMachine, предложенную @max_pleaner, я смог создать рабочую версию кода с внутренними обратными вызовами функции для создания al oop:

require 'faye/websocket'
require 'eventmachine'
require 'json'

Faye::WebSocket.load_adapter('thin')

module SocketTest
  class Websocket

    def initialize(app)
      @app     = app
    end

    def call(env)
      EM.run {
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: 15 })
          ws.on :open do |event|
            response = {
              :responseCode => 100,
              :message => "Connection opened"
            }
            $logger.info "< #{response.to_json}"
            ws.send(response.to_json)
          end

          ws.on :message do |event|
            response = {
              :responseCode => 100,
              :message => "Received request, running slow function"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            total_runs = 4
            def long_function(ws, count, total_runs)
              if count > total_runs then
                # Error out
                puts "Reached full count, exiting"
                ws.close()
                return
              end
              # Logic here
              if count == 3 then
                response = {
                  :responseCode => 200,
                  :message => "Long function ran"
                }
                ws.send(response.to_json)
                $logger.info "< #{response.to_json}"
                ws.close()
              else
                response = {
                  :responseCode => 100,
                  :message => "Waiting for long function to complete"
                }
                ws.send(response.to_json)
                $logger.info "< #{response.to_json}"
                EventMachine.add_timer(5) {
                  long_function(ws, count + 1, total_runs)
                }
              end
            end
            long_function(ws, 1, total_runs)
          end
          ws.on :close do |event|
            $logger.info "< CLOSE"
            ws = nil
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end
      }
    end    
  end
end
...