Basic Single Producer, Single Consumer Queue.Память не освобождена - PullRequest
1 голос
/ 19 марта 2019

Я хочу реализовать базового единого производителя - WorkerQueue с одним потребителем.Проблема в том, что память объектов в очереди не освобождается.

Если I

  1. заполнить WorkerQueue :: queue_ 1000 объектами размером 1 МБ
  2. вызовWorkerQueue :: Start ()

использование памяти приложением медленно сокращается с 1 ГБ до нуля.

Если I

  1. , вызовите WorkerQueue ::Начните();
  2. заполнение WorkerQueue :: queue_ 1000 объектами размером 1 МБ

использование памяти приложением остается постоянным на уровне 1 ГБ до завершения работы программ.

В обоих случаяхвсе пакеты обрабатываются, и деструктор пакетов вызывается 1000 раз.Таким образом, несмотря на вызов деструктора Пакета, память не освобождается.Конечно, я хочу иметь возможность ставить элементы в очередь после вызова :: Start ().

Платформа: Ubuntu 18.10, g ++ - 8

g++ -pthread -g -std=gnu++14 -o mytest main.cpp

#include <iostream>
#include <condition_variable>
#include <thread>
#include <vector>
#include <mutex>

using namespace std;

struct Packet {
    explicit Packet(size_t size) {
        data_.resize(size );
    }
    virtual ~Packet() {
        data_ = std::vector<char>();
        std::cout << "~Packet();\n";
    }
    std::vector<char> data_;
};

template<class T>
struct WorkerQueue {
    void Start() {
        t_ = std::move(thread{&WorkerQueue::Run, this});
    }

    void Enqueue(T t) {
        std::lock_guard<std::mutex> l{m_};
        queue_.push_back(std::move(t));
        cv_.notify_one();
    }

    void Run() {
        while(true) {
            {
                std::unique_lock<std::mutex> l{m_};
                cv_.wait(l, [&](){return !queue_.empty();});
                // calls T::~T()
                queue_.erase(begin(queue_));
                std::cout << "q_size: " << queue_.size() << endl;
            }
            // simulate workload
            this_thread::sleep_for(50ms);
        }
    }

    std::vector<T> queue_;
    thread t_;
    mutex m_;
    condition_variable cv_;
};


int main() {
    const size_t kPacketSize = 1*1000*1000;
    WorkerQueue<std::unique_ptr<Packet>> wq;

    wq.Start();// DOES NOT free memory
    for(int i=0;i<1000;++i) {
        wq.Enqueue(make_unique<Packet>(kPacketSize));
    }
//    wq.Start();// DOES free memory

    this_thread::sleep_for(60s);
}
...