Не получать уведомления с помощью pqxx :: messages_receiver - PullRequest
0 голосов
/ 29 января 2020

Я установил триггер и проверил, используя pgadmin4 (используя LISTEN), который изменяет мою таблицу, получая уведомления. Ожидается, что веб-интерфейс может вносить изменения в настройки, и мой бэкэнд-код должен принять эти изменения и применить.

Я создал небольшой класс (функции встроены), расширяющий класс получателя уведомлений.

    class SettingsChangeListener: public pqxx::notification_receiver
{
public:
  SettingsChangeListener(pqxx::connection_base &c);
  virtual void operator()(const std::string &payload, int backend_pid)
  throw ();

  void RegisterParentData(SINT32 queueId, UINT32 eventID);

private:
  SINT32 m_parentQueueHandle; // handle to the parent's message queue
  UINT32 m_eventID; // event to send to the parent on notification
};

inline SettingsChangeListener::SettingsChangeListener(pqxx::connection_base &c)
   : pqxx::notification_receiver(c, "settings_changed"), m_parentQueueHandle(0), m_eventID(0)
{

}

inline void SettingsChangeListener::operator()(const std::string &payload, int backend_pid)
throw ()
{
   if (m_parentQueueHandle > 0)
   {
      LOG_SYSTEM_INFO("Detected Change in Settings\n");
      // don't need the notification text - notify parent that the data has changed
      CmnMessage msg;
      msg.SetMessageID(m_eventID);
      msg.SetMessageType(MESSAGE_TYPE_Local);
      msg.SetMessageSource(MODULE_DATABASE);

      MsgQueueMessage msgToSend;
      size_t size = msg.Export(msgToSend);

      if (FAIL == msgsnd(m_parentQueueHandle, &msgToSend, size, IPC_NOWAIT))
      {
         LOG_SYSTEM_ERROR("FAILED to send message on queue : %s\n", strerror(errno));
      }
   }
}

inline void SettingsChangeListener::RegisterParentData(SINT32 queueId, UINT32 eventID)
{
   m_parentQueueHandle = msgget(queueId, IPC_CREAT | MSGQ_CREAT_PERMISSION);
   m_eventID = eventID;
}

В одном из моих других классов я включил член SettingsChangeListener и инициализировал в конструкторе этот класс.

MyClass::MyClass()
    : m_connection(DB_CONNECTION_STRING.c_str()), m_settingsDbAdapter(m_connection), m_settingsChangeListener(m_connection)

{}

Соединение и член сохраняются в течение всего срока действия приложения. Адаптер БД - это класс, который управляет таблицей настроек (добавление / обновление / удаление / получение) и работает без проблем.

Я ставлю точку останова в операторе () (), и она никогда не срабатывает, и я никогда не видеть уведомление. Я установил точку останова и подтвердил, что регистрация родительских данных произошла, как и ожидалось.

Я впервые использую pqxx, и не все функции кажутся простыми для реализации. Я думаю, что есть что-то простое, что мне не хватает, но я просто не могу его найти.

1 Ответ

0 голосов
/ 30 января 2020

Была часть, которую мне не хватало в примере кода. Эта функциональность должна существовать в контексте потока, и необходимо следующее. Это было взято дословно из примера. Я урежу части, которые я не использую. В общем, pselect был, вероятно, ключом к тому, чтобы заставить это работать.

   while (true)
   {
      /* Step #4.  Create set of file descriptors to pass to 'pselect()'.
       * You can put anything in here that you want (files, sockets, serial ports,
       * descriptors from other sources).
       *
       * IMPORTANT: If you want to catch 'NOTIFY' events, you MUST add the
       * 'db.sock()' descriptor!  This is the socket for the database connection,
       * where notifications from the database backend come in.
       */
      fd_set read_fds;
      FD_ZERO(&read_fds);
      FD_SET(STDIN_FILENO, &read_fds);  // optional (just part of the demo)
      FD_SET(db.sock(), &read_fds);    // postgres connection (mandatory)
      const int max_fd = db.sock() + 1;

      /* Optional timeout (in case your process wanted to take action if no events
       * occur for a period of time).
       */
      struct timespec ts;
      ts.tv_sec = 5;
      ts.tv_nsec = 0;

      /* Step #5.  Magic happens here!  Put process to sleep until something
       * "interesting" happens.
       *
       * Unix was built to do "synchronous IO multiplexing".
       *
       * pselect() can return the following information values:
       * -1 = pselect() failed or was interrupted (errno == EINTR).
       *  0 = timeout expired (timeout is optional).
       * >1 = count of file descriptors in any of the sets that have activity.
       *
       * If the return value is positive, pselect() will have modified the file
       * descriptor sets: descriptors with activity will remain set, but all
       * others will be cleared.
       *
       * pselect() can return "failure" (-1) for only one acceptable reason: it
       * was interrupted by a signal.  For this demo, we assume that that signal
       * is SIGCHLD.  Any other errno value is beyond the scope of the demo.
       */
      switch (pselect(max_fd, &read_fds, NULL, NULL, &ts, &origmask))
      {
         case -1:
            if (errno != EINTR) throw CStandardError("pselect");

            /* Interrupted by signal, which we assume is SIGCHLD.
             *
             * We reap children here, but without updating read_fds.  So don't
             * re-use it when we're done reaping.
             */
            reap_children(db);
            break;

         case 0:
            // No file descriptors ready; we got here through timeout.
            std::cout << "pselect() timeout" << std::endl;
            break;

         default:
            // Check for, and handle, incoming data on stdin.
            if (FD_ISSET(STDIN_FILENO, &read_fds))
               handle_stdin_data();

            // Check for, and handle, notifications.  Calls MyNoticer::operator().
            if (FD_ISSET(db.sock(), &read_fds))
               db.get_notifs();

            break;
      }
   }
...