Почему в этом примере нет конфликта между Boost Interprocess message_queue? - PullRequest
0 голосов
/ 21 января 2019

Я пытаюсь найти хорошую схему именования для Boost Interprocess message_queue. Предположим, что есть группа процессов, использующих одну и ту же очередь. Может быть несколько одновременных «прогонов» этой группы процессов. Я предполагаю, что для каждого запуска необходимо использовать уникальное имя в очереди, которое используется в группе процессов. Следующий код прототипа отправляет кучу сообщений серверному процессу, который читает сообщения из очереди и затем печатает их:

#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/serialization/string.hpp>

#include <iostream>
#include <vector>

#include <unistd.h>

namespace ipc = boost::interprocess;

constexpr unsigned messages{1000000};
constexpr unsigned maxMessageSize{8192};

struct Message
{
  std::string prefix;

  template<typename Archive>
  void serialize(Archive& ar, const unsigned int)
  {
    // clang-format off
    ar & prefix;
    // clang-format on
  }
};

int loggerServerMain()
{
  try
  {
    // Open a message queue.
    ipc::message_queue mq(ipc::open_only, "message_queue");

    unsigned int priority;
    ipc::message_queue::size_type recvd_size;

    for (int i = 0; i < messages; ++i)
    {
      std::string data;
      data.resize(maxMessageSize);

      mq.receive(&data[0], maxMessageSize, recvd_size, priority);
      if (recvd_size > maxMessageSize)
        return 1;

      std::istringstream iss{data};
      boost::archive::binary_iarchive ia{iss};

      Message message;
      ia >> message;

      std::cout << message.prefix << '\n';
    }
  }
  catch (const ipc::interprocess_exception& ex)
  {
    std::cout << ex.what() << std::endl;
    return 1;
  }
  ipc::message_queue::remove("message_queue");
  return 0;
}

int main()
{
  try
  {
    ipc::message_queue::remove("message_queue");
    ipc::message_queue mq(ipc::create_only, "message_queue", 100, maxMessageSize);

    if (fork() == 0)
    {
      exit(loggerServerMain());
    }

    for (int i = 0; i < messages; ++i)
    {
      Message message{"lib" + std::to_string(i)};

      std::ostringstream oss;
      boost::archive::binary_oarchive oa{oss};
      oa << message;

      std::string data{oss.str()};
      mq.send(data.data(), data.size(), 0);
    }
  }
  catch (const ipc::interprocess_exception& ex)
  {
    std::cout << ex.what() << std::endl;
    return 1;
  }

  return 0;
}

Итак, у нас есть процесс, который разветвляет сервер, а затем пересылает на сервер целую кучу сообщений, которые, в свою очередь, будут печатать сообщения. Мы используем жестко закодированное имя 'message_queue' для имени основной очереди сообщений.

Я был удивлен, обнаружив, что этот пример все еще работает, когда несколько экземпляров этого процесса запускаются одновременно, например, следующим образом:

./message_queue > 1.log & ; sleep 1 ; ./message_queue > 2.log

Оба журнала содержат все сообщения в правильном порядке. Первый экземпляр message_queue все еще работает, когда запускается второй. Оба журнала создаются и записываются одновременно.

Как может случиться так, что нет конкуренции за очередь сообщений при одновременном использовании из нескольких групп процессов? Я предположил, что имя очереди сообщений является глобальным, определенным на системном уровне, но из моего примера кажется, что оно как-то определено для дерева процессов? Я копался в документации Boost Interprocess, но не нашел упоминаний об этом.

Я использую Linux 4.20, GCC 8.1.0. Пример кода скомпилирован следующим образом:

g++ -std=c++17 -O3 -o message_queue message_queue.cpp -lpthread -lboost_serialization -lrt
...