Очередь слияния std :: maps в одну std :: map параллельно - PullRequest
0 голосов
/ 21 апреля 2020

У меня такая проблема. У меня есть самостоятельно реализованная поточно-ориентированная очередь. Я заполнил его std::map<std::string, int> с, следующей структуры: слово - число его вхождений. Поэтому я хочу объединить эти карты, чтобы получить карту, которая будет представлять общее количество вхождений этих слов. Я хочу сделать это параллельно, поэтому я реализовал следующую функцию (слияние), но я думаю, что это может быть в том случае, когда отображение завершится sh, тогда как еще будут карты для слияния. Как я могу это исправить? Вот мой код собственной реализации очереди, функции слияния и пример выполнения.

template<class T>
class safeQueue {
    std::deque<T> que;
    mutable std::mutex m;
    std::condition_variable cvNotEmpty;
    std::condition_variable cvNotMax;
    size_t max_size = 10;

public:
    safeQueue(){}

    void setMaxSize(size_t newMax) {
        std::lock_guard<std::mutex> lg{m};
        max_size = newMax;
    }

    auto size() const {
        std::lock_guard<std::mutex> lg{m};
        return que.size();
    }



    void pushFront(T item) {
        std::unique_lock<std::mutex> lg{m};
        cvNotMax.wait(lg, [this](){return max_size != que.size();});

        que.push_front(item);
        cvNotEmpty.notify_one();
    }

    void pushBack(T item) {
        std::unique_lock<std::mutex> lg{m};
        cvNotMax.wait(lg, [this](){return max_size != que.size();});

        que.push_back(item);
        cvNotEmpty.notify_one();
    }



    T popBack(){
        std::unique_lock<std::mutex> lg{m};
        cvNotEmpty.wait(lg, [this](){return que.size() != 0;});
        T result = que.back();
        que.pop_back();
        return result;
    }

    T popFront(){
        std::unique_lock<std::mutex> lg{m};
        cvNotEmpty.wait(lg, [this](){return que.size() != 0;});
        T result = que.front();
        que.pop_front();
        return result;
    }
};
void merge(safeQueue<std::map<std::string, int>>& que) {

    while (que.size() >= 2) {

        std::map<std::string, int> map1 = que.popFront();
        std::map<std::string, int> map2 = que.popFront();
        for (const auto &element : map2) {
            map1[element.first] += element.second;
        }

        que.pushBack(map1);
    }

}
int main() {
    auto mergers = new std::vector<std::thread>;

    std::map<std::string, int> map1;
    std::map<std::string, int> map2;
    std::map<std::string, int> map3;
    std::map<std::string, int> map5;

    map1.insert(std::pair("a", 3));
    map1.insert(std::pair("b", 3));
    map1.insert(std::pair("c", 3));
    map1.insert(std::pair("d", 3));
    map1.insert(std::pair("e", 3));
    map1.insert(std::pair("f", 3));

    map2.insert(std::pair("a", 3));
    map2.insert(std::pair("b", 3));
    map2.insert(std::pair("c1", 3));
    map2.insert(std::pair("d", 3));
    map2.insert(std::pair("e1", 3));
    map2.insert(std::pair("f1", 3));

    map3.insert(std::pair("a", 3));
    map3.insert(std::pair("b", 3));
    map3.insert(std::pair("c2", 3));
    map3.insert(std::pair("d", 3));
    map3.insert(std::pair("e2", 3));
    map3.insert(std::pair("f2", 3));

    map5.insert(std::pair("a", 3));
    map5.insert(std::pair("b", 3));
    map5.insert(std::pair("c4", 3));
    map5.insert(std::pair("d", 3));
    map5.insert(std::pair("e4", 3));
    map5.insert(std::pair("f4", 3));


    safeQueue<std::map<std::string, int>> que;
    que.pushBack(map1);
    que.pushBack(map2);
    que.pushBack(map3);
    que.pushBack(map5);

    int num_mergers = 3;

    for (int i =0; i < num_mergers; i++) {
        mergers->emplace_back(std::thread(merge, std::ref(que)));
    }

    for (int k=0; k < num_mergers; k++) {
        mergers->at(k).join();
    }

    std::map<std::string, int> map_res = que.popBack();

    for (const auto& c : map_res) {
        std:: cout << c.first + " " << c.second << std::endl;
    }

    return 0;
}

1 Ответ

0 голосов
/ 21 апреля 2020

Проблема в том, что safeQueue::size может вернуть устаревший результат, так как другой поток может вставить или удалить элемент, как только safeQueue::size снимет внутреннюю блокировку. Для вашей функции merge это означает, что к моменту проверки условия while (que.size() >= 2) размер очереди, возможно, уже изменился. В частности, merge может возвращаться, когда размер очереди больше 1.

Вам необходимо заблокировать мьютекс внутренней очереди и выполнить три действия, не освобождая его:

  1. Протестировать очередь размер. Возврат, если значение меньше 2.
  2. Снятие очереди с первого элемента.
  3. Снятие очереди с второго элемента.

Вы все равно можете получить более одного элемента в очереди после merge, если некоторые другие потоки добавляют в очередь новые элементы, кроме merge. Если это так, вам нужно быть готовым к этому или блокировать эти другие потоки, пока merge не завершится и вы не обработаете его результат.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...