Я разделяю boost::interprocess::vector
между двумя процессами, используя boost::interprocess::named_mutex
и boost::interprocess::named_condition
.
Читатель начинает с получения мьютекса и ожидает записи данных. Автор получает мьютекс, начинает писать, но висит на строке, где он обновляет общий вектор .
Если я запускаю программу, я получаю следующий вывод:
Reader trying to get mutex
Reader waiting for data
Writer attempting to get mutex
Writer got mutex. Number of items to write: 2
Writing value: 1
У писателя есть два элемента для вставки в вектор, но по какой-то причине он останавливается после вставки первого и просто зависает.
Это код для автора (полный код ниже):
void write(const std::vector<T>& items)
{
std::cout << "Writer attempting to get mutex" << std::endl;
scoped_lock<named_mutex> lock(*mutex);
{
std::cout << "Writer got mutex. Number of items to write: " << items.size() << std::endl;
for(const auto& item : items)
{
std::cout << "Writing value: " << item << std::endl;
vec->push_back(item); // <--------------------------- HANGS HERE -----
}
std::cout << "Writer notifying reader" << std::endl;
cond_empty->notify_all();
}
std::cout << "Writer finished" << std::endl;
}
Это полный код (должен быть в состоянии скопировать, вставить и запустить):
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <string>
#include <cstdlib> //std::system
#include <iostream>
#include <memory>
using namespace boost::interprocess;
template<typename T>
struct MySharedData
{
using ShmemAllocator = allocator<T, managed_shared_memory::segment_manager>;
using MyVector = vector<T, ShmemAllocator>;
MySharedData(const bool isConsumer, const std::string& sharedMemoryName, const std::string& blockName, const int numBytes) : shared_memory_name(sharedMemoryName), block_name(blockName)
{
is_consumer = isConsumer;
segment.reset(new managed_shared_memory(open_or_create, sharedMemoryName.c_str(), numBytes));
const ShmemAllocator alloc_inst(segment->get_segment_manager());
vec = segment->find_or_construct<MyVector>(blockName.c_str())(alloc_inst);
cond_empty.reset(new named_condition(open_or_create, sharedMemoryName.c_str()));
mutex.reset(new named_mutex(open_or_create, sharedMemoryName.c_str()));
}
~MySharedData()
{
if(is_consumer)
{
segment->destroy<MyVector>(block_name.c_str());
}
}
void write(const std::vector<T>& items)
{
std::cout << "Writer attempting to get mutex" << std::endl;
scoped_lock<named_mutex> lock(*mutex);
{
std::cout << "Writer got mutex. Number of items to write: " << items.size() << std::endl;
for(const auto& item : items)
{
std::cout << "Writing value: " << item << std::endl;
vec->push_back(item); // <--------------------------- HANGS HERE -----
}
std::cout << "Writer notifying reader" << std::endl;
cond_empty->notify_all();
}
std::cout << "Writer finished" << std::endl;
}
std::vector<T> read()
{
std::vector<T> toReturn;
bool continue_trying = true;
while(continue_trying)
{
std::cout << "Reader trying to get mutex" << std::endl;
scoped_lock<named_mutex> lock(*mutex);
{
if(nullptr != vec )
{
if(vec->empty())
{
std::cout << "Reader waiting for data" << std::endl;
cond_empty->wait(lock);
std::cout << "Reader notified of data" << std::endl;
}
for(auto& t : *vec)
{
std::cout << "Reading: " << t << std::endl;
toReturn.push_back(t);
}
continue_trying = false;
}
else
{
std::cout << "No data to read from shared memory: " << shared_memory_name << " block: " << block_name << std::endl;
continue_trying = false;
}
}
}
std::cout << "Reader finished" << std::endl;
return toReturn;
}
std::unique_ptr<named_mutex> mutex{nullptr};
MyVector* vec{nullptr};
std::unique_ptr<managed_shared_memory> segment{nullptr};
std::unique_ptr<named_condition> cond_empty;
bool is_consumer{false};
std::string shared_memory_name;
std::string block_name;
};
void parent()
{
MySharedData<int> msd1(false, "a", "b", 100000);
std::vector<int> vec;
vec.push_back(1);
vec.push_back(2);
msd1.write(vec);
}
void child()
{
MySharedData<int> msd2(true, "a", "b", 100000);
std::vector<int> x = msd2.read();
}
int main()
{
shared_memory_object::remove("a");
shared_memory_object::remove("b");
shared_memory_object::remove("c");
shared_memory_object::remove("d");
named_mutex::remove("a");
named_mutex::remove("b");
named_mutex::remove("c");
named_mutex::remove("d");
named_condition::remove("a");
named_condition::remove("b");
named_condition::remove("c");
named_condition::remove("d");
// The below code spawns the parent method off to a separate process
pid_t pid = fork();
if(pid == 0)
{
//child();
parent();
}
else if(pid > 0)
{
//parent();
child();
}
std::cout << "FINISHED" << std::endl;
}