Rails & postgresql, уведомление / прослушивание при создании новой записи на панели администратора без условий гонки - PullRequest
1 голос
/ 28 марта 2020

У меня есть панель администратора, где я хочу, чтобы оповещение срабатывало при создании пользователя (на отдельной странице). Код ниже работает, однако, есть условие гонки. Если 2 пользователя созданы очень близко друг к другу, он срабатывает только один раз.

class User < ApplicationRecord
  after_commit :notify_creation, on: :create

  def notify_creation
    ActiveRecord::Base.connection_pool.with_connection do |connection|
      self.class.execute_query(connection, ["NOTIFY user_created, '?'", id])
    end
  end

  def self.listen_to_creation
    ActiveRecord::Base.connection_pool.with_connection do |connection|
      begin
        execute_query(connection, ["LISTEN user_created"])
        connection.raw_connection.wait_for_notify do |event, pid, id|
          yield id
        end
      ensure
        execute_query(connection, ["UNLISTEN user_created"])
      end
    end
  end

  def self.clean_sql(query)
    sanitize_sql(query)
  end

  private

  def self.execute_query(connection, query)
    sql = self.clean_sql(query)
    connection.execute(sql)
  end
end

class AdminsController < ApplicationController
  include ActionController::Live

  def update
    response.headers['Content-Type'] = 'text/event-stream'
    sse = SSE.new(response.stream, event: 'notice')
    begin
      User.listen_to_creation do |user_id|
        sse.write({user_id: user_id})
      end
    rescue ClientDisconnected
    ensure
      sse.close
    end
  end
end

Я делаю это впервые, поэтому я следовал этому учебнику , который, как и большинство учебников, ориентирован на обновление одной записи, а не на прослушивание всей таблицы для новое творение.

1 Ответ

0 голосов
/ 04 апреля 2020

Это происходит потому, что вы отправляете только одно обновление одновременно, а затем запрос заканчивается. Если вы делаете запрос на обновление AdminsController #. У вас есть один подписчик, ожидающий вашего уведомления. Посмотрите на этот блок

begin
  execute_query(connection, ["LISTEN user_created"])
  connection.raw_connection.wait_for_notify do |event, pid, id|
    yield id
  end
ensure
  execute_query(connection, ["UNLISTEN user_created"])
end

Как только вы получите одно уведомление, блок сдается, а затем вы закрываете канал. Поэтому, если вы полагаетесь на то, что веб-интерфейс сделает еще одну попытку подключения после получения результата, если запись будет создана до того, как вы снова начнете прослушивать канал в новом соединении, вы не получите уведомление, так как не было подключенного прослушивателя. до Postgres в то время.

Это типичная проблема в любой системе уведомлений в реальном времени. В идеале вы хотели бы, чтобы канал внешнего интерфейса (Websocket, SSE или даже LongPolling) всегда был открыт. Если вы получаете новый элемент, вы отправляете его во внешний интерфейс с помощью этого канала, и в идеале вы должны держать этот канал открытым, как в случае с Websockets и SSE. Прямо сейчас вы относитесь к своему SSE-соединению как к длительному опросу.

Таким образом, ваш код должен выглядеть примерно так:

# Snippet 2
  def self.listen_to_creation
    ActiveRecord::Base.connection_pool.with_connection do |connection|
      begin
        execute_query(connection, ["LISTEN user_created"])
        loop do 
          connection.raw_connection.wait_for_notify do |event, pid, id|
            yield id
          end
        end
      ensure
        execute_query(connection, ["UNLISTEN user_created"])
      end
    end
  end

Но это столкнется с проблемой, когда он будет поддерживать поток живым навсегда, даже если соединение закрыто до тех пор, пока некоторые данные не попадут в поток и не обнаружат ошибку во время обратной записи. Вы можете либо запустить его фиксированное число раз с короткими интервалами уведомления, либо добавить к нему разновидность звукового удара. Есть два простых способа сделать сердцебиение. Я добавлю их в качестве быстрых хак-кодов.

# Snippet 3
def self.listen_to_creation(heartbeat_interval = 10)
    ActiveRecord::Base.connection_pool.with_connection do |connection|
      begin
        execute_query(connection, ["LISTEN user_created"])
        last_hearbeat = Time.now
        loop do 
          connection.raw_connection.wait_for_notify(heartbeat_interval) do |event, pid, id|
            yield({id: id})
          end
          if Time.now - last_heartbeat >= heartbeat_interval
            yield({heartbeat: true})
            last_heartbeat = Time.now
          end
        end
      ensure
        execute_query(connection, ["UNLISTEN user_created"])
      end
    end
  end

В приведенном выше примере вы, по крайней мере, будете посылать что-то в канал каждую секунду heartbeat_interval. Таким образом, если труба закрывается, она должна выйти из строя и закрыть трубу, освобождая тем самым нить.

Этот подход добавляет в модель логи, связанные с контроллером c, и, если вы хотите удерживать уведомление postgres без временного интервала, другая вещь, которую вы можете сделать, чтобы сделать биение, это просто запустить нить в самом контроллере. Запустите поток в методе контроллера, который спит для heartbeat_interval и пишет sse.write({heartbeat: true}) после пробуждения. В этом случае вы можете оставить код модели таким же, как Snippet 2.

Кроме того, я добавил другие вещи, которые нужно посмотреть с SSE с Puma & Rails, в ответ на ваш другой вопрос:

...