Ошибка при использовании thread_local для поддержки параллельного буфера памяти - PullRequest
0 голосов
/ 17 июня 2020

В следующем коде я хочу создать буфер памяти, который позволяет нескольким потокам читать / записывать его одновременно. Одновременно все потоки будут читать этот буфер параллельно, а позже они будут записывать в буфер параллельно. Но при этом не будет операций чтения / записи.

Для этого я использую vector из shared_ptr<vector<uint64_t>>. Когда прибывает новый поток, он будет выделен новым vector<uint64_t> и только писать в него. Два потока не будут писать в один и тот же вектор.

Я использую thread_local для отслеживания индекса вектора и смещения, в которое будет записывать текущий поток. Когда мне нужно добавить новый буфер в переменную memory_, я использую мьютекс для его защиты.

class TestBuffer {
public:
    thread_local static uint32_t index_;
    thread_local static uint32_t offset_;
    thread_local static bool ready_;

    vector<shared_ptr<vector<uint64_t>>> memory_;
    mutex lock_;

    void init() {
        if (!ready_) {
            new_slab();
            ready_ = true;
        }
    }

    void new_slab() {
        std::lock_guard<mutex> lock(lock_);
        index_ = memory_.size();
        memory_.push_back(make_shared<vector<uint64_t>>(1000));
        offset_ = 0;
    }

    void put(uint64_t value) {
        init();
        if (offset_ == 1000) {
            new_slab();
        }
        if(memory_[index_] == nullptr) {
            cout << "Error" << endl;
        }
        *(memory_[index_]->data() + offset_) = value;
        offset_++;
    }
};

thread_local uint32_t TestBuffer::index_ = 0;
thread_local uint32_t TestBuffer::offset_ = 0;
thread_local bool TestBuffer::ready_ = false;

int main() {
    TestBuffer buffer;
    vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        thread t = thread([&buffer, i]() {
            for (int j = 0; j < 10000; ++j) {
                buffer.put(i * 10000 + j);
            }
        });
        threads.emplace_back(move(t));
    }
    for (auto &t: threads) {
        t.join();
    }
}

Код работает не так, как ожидалось, и сообщает об ошибке в функции put . Причина root в том, что memory_[index_] иногда возвращает nullptr. Однако я не понимаю, почему это возможно, поскольку считаю, что правильно установил значения. Спасибо за помощь!

1 Ответ

1 голос
/ 17 июня 2020

У вас есть состояние гонки в put, вызванное new_slab(). Когда new_slab вызывает memory_.push_back(), вектор _memory может нуждаться в изменении размера, а если другой поток выполняет put во время изменения размера, memory_[index_] может получить доступ к устаревшим данным.

One Решение состоит в том, чтобы защитить вектор _memory путем блокировки мьютекса:

{
    std::lock_guard<mutex> lock(lock_);

    if(memory_[index_] == nullptr) {
        cout << "Error" << endl;
    }
    *(memory_[index_]->data() + offset_) = value;
}

Другой вариант - заранее зарезервировать необходимое пространство в векторе memory_.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...