У меня такая проблема. У меня есть самостоятельно реализованная поточно-ориентированная очередь. Я заполнил его std::map<std::string, int>
с, следующей структуры: слово - число его вхождений. Поэтому я хочу объединить эти карты, чтобы получить карту, которая будет представлять общее количество вхождений этих слов. Я хочу сделать это параллельно, поэтому я реализовал следующую функцию (слияние), но я думаю, что это может быть в том случае, когда отображение завершится sh, тогда как еще будут карты для слияния. Как я могу это исправить? Вот мой код собственной реализации очереди, функции слияния и пример выполнения.
template<class T>
class safeQueue {
std::deque<T> que;
mutable std::mutex m;
std::condition_variable cvNotEmpty;
std::condition_variable cvNotMax;
size_t max_size = 10;
public:
safeQueue(){}
void setMaxSize(size_t newMax) {
std::lock_guard<std::mutex> lg{m};
max_size = newMax;
}
auto size() const {
std::lock_guard<std::mutex> lg{m};
return que.size();
}
void pushFront(T item) {
std::unique_lock<std::mutex> lg{m};
cvNotMax.wait(lg, [this](){return max_size != que.size();});
que.push_front(item);
cvNotEmpty.notify_one();
}
void pushBack(T item) {
std::unique_lock<std::mutex> lg{m};
cvNotMax.wait(lg, [this](){return max_size != que.size();});
que.push_back(item);
cvNotEmpty.notify_one();
}
T popBack(){
std::unique_lock<std::mutex> lg{m};
cvNotEmpty.wait(lg, [this](){return que.size() != 0;});
T result = que.back();
que.pop_back();
return result;
}
T popFront(){
std::unique_lock<std::mutex> lg{m};
cvNotEmpty.wait(lg, [this](){return que.size() != 0;});
T result = que.front();
que.pop_front();
return result;
}
};
void merge(safeQueue<std::map<std::string, int>>& que) {
while (que.size() >= 2) {
std::map<std::string, int> map1 = que.popFront();
std::map<std::string, int> map2 = que.popFront();
for (const auto &element : map2) {
map1[element.first] += element.second;
}
que.pushBack(map1);
}
}
int main() {
auto mergers = new std::vector<std::thread>;
std::map<std::string, int> map1;
std::map<std::string, int> map2;
std::map<std::string, int> map3;
std::map<std::string, int> map5;
map1.insert(std::pair("a", 3));
map1.insert(std::pair("b", 3));
map1.insert(std::pair("c", 3));
map1.insert(std::pair("d", 3));
map1.insert(std::pair("e", 3));
map1.insert(std::pair("f", 3));
map2.insert(std::pair("a", 3));
map2.insert(std::pair("b", 3));
map2.insert(std::pair("c1", 3));
map2.insert(std::pair("d", 3));
map2.insert(std::pair("e1", 3));
map2.insert(std::pair("f1", 3));
map3.insert(std::pair("a", 3));
map3.insert(std::pair("b", 3));
map3.insert(std::pair("c2", 3));
map3.insert(std::pair("d", 3));
map3.insert(std::pair("e2", 3));
map3.insert(std::pair("f2", 3));
map5.insert(std::pair("a", 3));
map5.insert(std::pair("b", 3));
map5.insert(std::pair("c4", 3));
map5.insert(std::pair("d", 3));
map5.insert(std::pair("e4", 3));
map5.insert(std::pair("f4", 3));
safeQueue<std::map<std::string, int>> que;
que.pushBack(map1);
que.pushBack(map2);
que.pushBack(map3);
que.pushBack(map5);
int num_mergers = 3;
for (int i =0; i < num_mergers; i++) {
mergers->emplace_back(std::thread(merge, std::ref(que)));
}
for (int k=0; k < num_mergers; k++) {
mergers->at(k).join();
}
std::map<std::string, int> map_res = que.popBack();
for (const auto& c : map_res) {
std:: cout << c.first + " " << c.second << std::endl;
}
return 0;
}