Я пытаюсь найти хорошую схему именования для Boost Interprocess message_queue
. Предположим, что есть группа процессов, использующих одну и ту же очередь. Может быть несколько одновременных «прогонов» этой группы процессов. Я предполагаю, что для каждого запуска необходимо использовать уникальное имя в очереди, которое используется в группе процессов. Следующий код прототипа отправляет кучу сообщений серверному процессу, который читает сообщения из очереди и затем печатает их:
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/serialization/string.hpp>
#include <iostream>
#include <vector>
#include <unistd.h>
namespace ipc = boost::interprocess;
constexpr unsigned messages{1000000};
constexpr unsigned maxMessageSize{8192};
struct Message
{
std::string prefix;
template<typename Archive>
void serialize(Archive& ar, const unsigned int)
{
// clang-format off
ar & prefix;
// clang-format on
}
};
int loggerServerMain()
{
try
{
// Open a message queue.
ipc::message_queue mq(ipc::open_only, "message_queue");
unsigned int priority;
ipc::message_queue::size_type recvd_size;
for (int i = 0; i < messages; ++i)
{
std::string data;
data.resize(maxMessageSize);
mq.receive(&data[0], maxMessageSize, recvd_size, priority);
if (recvd_size > maxMessageSize)
return 1;
std::istringstream iss{data};
boost::archive::binary_iarchive ia{iss};
Message message;
ia >> message;
std::cout << message.prefix << '\n';
}
}
catch (const ipc::interprocess_exception& ex)
{
std::cout << ex.what() << std::endl;
return 1;
}
ipc::message_queue::remove("message_queue");
return 0;
}
int main()
{
try
{
ipc::message_queue::remove("message_queue");
ipc::message_queue mq(ipc::create_only, "message_queue", 100, maxMessageSize);
if (fork() == 0)
{
exit(loggerServerMain());
}
for (int i = 0; i < messages; ++i)
{
Message message{"lib" + std::to_string(i)};
std::ostringstream oss;
boost::archive::binary_oarchive oa{oss};
oa << message;
std::string data{oss.str()};
mq.send(data.data(), data.size(), 0);
}
}
catch (const ipc::interprocess_exception& ex)
{
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
Итак, у нас есть процесс, который разветвляет сервер, а затем пересылает на сервер целую кучу сообщений, которые, в свою очередь, будут печатать сообщения. Мы используем жестко закодированное имя 'message_queue' для имени основной очереди сообщений.
Я был удивлен, обнаружив, что этот пример все еще работает, когда несколько экземпляров этого процесса запускаются одновременно, например, следующим образом:
./message_queue > 1.log & ; sleep 1 ; ./message_queue > 2.log
Оба журнала содержат все сообщения в правильном порядке. Первый экземпляр message_queue
все еще работает, когда запускается второй. Оба журнала создаются и записываются одновременно.
Как может случиться так, что нет конкуренции за очередь сообщений при одновременном использовании из нескольких групп процессов? Я предположил, что имя очереди сообщений является глобальным, определенным на системном уровне, но из моего примера кажется, что оно как-то определено для дерева процессов? Я копался в документации Boost Interprocess, но не нашел упоминаний об этом.
Я использую Linux 4.20, GCC 8.1.0. Пример кода скомпилирован следующим образом:
g++ -std=c++17 -O3 -o message_queue message_queue.cpp -lpthread -lboost_serialization -lrt