У меня есть несколько процессов, которые должны общаться друг с другом.Я хочу сделать это с помощью очередей сообщений.Похоже, что для этого создана очередь сообщений.Похоже, что очереди повышения создаются после архитектуры главный-подчиненный (очередь должна быть создана до того, как какой-либо процесс сможет что-то отправить в эту очередь).Теперь вся система должна быть способна к восстановлению в любое время в случае сбоя одного из процессов.
Это то, что у меня есть до сих пор:
Common:
#include <iostream>
#include <cstring>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <boost/thread.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <fstream>
using namespace boost::interprocess;
message_queue *queue = nullptr;
bool getData(uint8_t &data) {
if (queue == nullptr)
return false;
unsigned int priority;
message_queue::size_type recvd_size;
queue->try_receive(&data, sizeof(uint8_t), recvd_size, priority);
return recvd_size > 0;
}
void sendData(uint8_t data) {
if (queue == nullptr)
return;
queue->try_send(&data, sizeof(uint8_t), 0);
}
A (Передатчик):
int main()
{
reconnect:
try {
//Open a message queue.
queue = new message_queue(
open_only,
"message_queue"
);
} catch (interprocess_exception &ex) {
std::cout << "Error creating queue" << ex.get_error_code() << ex.what() << std::endl;
goto reconnect;
}
std::cout << "Transmitter connected!" << std::endl;
sendData(1);
return 0;
}
B (Приемник):
int main()
{
message_queue::remove("message_queue");
reconnect:
try {
//Open a message queue.
queue = new message_queue(
open_or_create,
"message_queue",
10,
sizeof(uint8_t)
);
} catch (interprocess_exception &ex) {
std::cout << "Error creating queue" << ex.get_error_code() << ex.what() << std::endl;
goto reconnect;
}
std::cout << "Receiver connected! Receiving..." << std::endl;
uint8_t data;
while(true) {
if (getData(data)) {
std::cout << "RX data " << data << std::endl;
}
}
}
Это, как правило, работает.Я беспокоюсь о:
- A: Создать очередь
- B: подключиться к очереди
- A: завершить / аварийно завершить работу и перезапустить
Теперь B подключен к очереди, которая была создана в результате сбоя процесса.По-прежнему ли B в каждом угловом случае подключен к вновь созданной очереди в B, если это то же имя?Как бы я обычно это реализовывал?Я, вероятно, был бы в безопасности, если бы просто открывал очередь с объектом в стеке перед каждым вызовом для отправки / получения, но это не так?