Кажется, что мой потокобезопасный код очереди работает, любые возможные состояния гонки, взаимоблокировки или другие проблемы проектирования? - PullRequest
0 голосов
/ 08 июля 2020

Я новичок в использовании condition_variable s и unique_lock s в C ++. Я работаю над созданием события l oop, которое опрашивает две пользовательские очереди событий и «логическое» (см. Целое число, действующее как логическое), на которое могут действовать несколько источников.

У меня есть демонстрация (ниже), который, похоже, работает, и я был бы очень признателен, если бы вы могли просмотреть и подтвердить, соответствует ли он передовым методам использования unique_lock и condition_variable s, а также любые проблемы, которые вы предвидите (условия гонки, блокировка потоков и т. 1037 *).

  1. В ThreadSafeQueue::enqueue(...): мы разблокируем дважды, вызывая notify и имея unique_lock go вне области действия?

  2. В методе TheadSafeQueue::dequeueAll(): мы предполагаем, что он вызывается методом, который был уведомлен (cond.notify) и, следовательно, был заблокирован. Есть ли лучший способ инкапсулировать это, чтобы вызывающий абонент оставался более чистым?

  3. Нужно ли нам сделать члены нашего класса изменчивыми , подобными этому ?

  4. Есть ли лучший способ смоделировать нашу ситуацию, который позволяет нам проверить, правильно ли мы реализовали блокировки? Возможно, без операторов сна и автоматизации процесса проверки?

ThreadSafeQueue.h:

#include <condition_variable>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <vector>

template <class T>
class ThreadSafeQueue {
 public:
  ThreadSafeQueue(std::condition_variable* cond, std::mutex* unvrsl_m)
      : ThreadSafeQueue(cond, unvrsl_m, 1) {}
  ThreadSafeQueue(std::condition_variable* cond, std::mutex* unvrsl_m,
                  uint32_t capacity)
      : cond(cond),
        m(unvrsl_m),
        head(0),
        tail(0),
        capacity(capacity),
        buffer((T*)malloc(get_size() * sizeof(T))),
        scratch_space((T*)malloc(get_size() * sizeof(T))) {}

  std::condition_variable* cond;

  ~ThreadSafeQueue() {
    free(scratch_space);
    free(buffer);
  }

  void resize(uint32_t new_cap) {
    std::unique_lock<std::mutex> lock(*m);
    check_params_resize(new_cap);

    free(scratch_space);
    scratch_space = buffer;
    buffer = (T*)malloc(sizeof(T) * new_cap);
    copy_cyclical_queue();
    free(scratch_space);
    scratch_space = (T*)malloc(new_cap * sizeof(T));

    tail = get_size();
    head = 0;
    capacity = new_cap;
  }
  void enqueue(const T& value) {
    std::unique_lock<std::mutex> lock(*m);
    resize();
    buffer[tail++] = value;
    if (tail == get_capacity()) {
      tail = 0;
    } else if (tail > get_capacity())
      throw("Something went horribly wrong TSQ: 75");
    cond->notify_one();
  }

// Assuming m has already been locked by the caller...
  void dequeueAll(std::vector<T>* vOut) {
    if (get_size() == 0) return;
    scratch_space = buffer;
    copy_cyclical_queue();
    vOut->insert(vOut->end(), buffer, buffer + get_size());
    head = tail = 0;
  }

  // Const functions because they shouldn't be modifying the internal variables
  // of the object
  bool is_empty() const { return get_size() == 0; }
  uint32_t get_size() const {
    if (head == tail)
      return 0;
    else if (head < tail) {
      // 1 2 3
      // 0 1 2
      // 1
      // 0
      return tail - head;
    } else {
      // 3 _ 1 2
      // 0 1 2 3
      // capacity-head + tail+1 = 4-2+0+1 = 2 + 1
      return get_capacity() - head + tail + 1;
    }
  }
  uint32_t get_capacity() const { return capacity; }
  //---------------------------------------------------------------------------
 private:
  std::mutex* m;
  uint32_t head;
  uint32_t tail;
  uint32_t capacity;
  T* buffer;
  T* scratch_space;
  uint32_t get_next_empty_spot();
  void copy_cyclical_queue() {
    uint32_t size = get_size();
    uint32_t cap = get_capacity();
    if (size == 0) {
      return;  // because we have nothing to copy
    }
    if (head + size <= cap) {
      // _ 1 2 3 ... index = 1, size = 3, 1+3 = 4 = capacity... only need 1 copy
      memcpy(buffer, scratch_space + head, sizeof(T) * size);
    } else {
      // 5 1 2 3 4 ... index = 1, size = 5, 1+5 = 6 = capacity... need to copy
      // 1-4 then 0-1

      // copy number of bytes: front = 1, to (5-1 = 4 elements)
      memcpy(buffer, scratch_space + head, sizeof(T) * (cap - head));
      // just copy the bytes from the front up to the first element in the old
      // array
      memcpy(buffer + (cap - head), scratch_space, sizeof(T) * tail);
    }
  }
  void check_params_resize(uint32_t new_cap) {
    if (new_cap < get_size()) {
      std::cerr << "ThreadSafeQueue: check_params_resize: size(" << get_size()
                << ") > new_cap(" << new_cap
                << ")... data "
                   "loss will occur if this happens. Prevented."
                << std::endl;
    }
  }
  void resize() {
    uint32_t new_cap;
    uint32_t size = get_size();
    uint32_t cap = get_capacity();
    if (size + 1 >= cap - 1) {
      std::cout << "RESIZE CALLED --- BAD" << std::endl;
      new_cap = 2 * cap;

      check_params_resize(new_cap);

      free(scratch_space);     // free existing (too small) scratch space
      scratch_space = buffer;  // transfer pointer over
      buffer = (T*)malloc(sizeof(T) * new_cap);  // allocate a bigger buffer
      copy_cyclical_queue();
      // move over everything with memcpy from scratch_space to buffer
      free(scratch_space);  // free what used to be the too-small buffer
      scratch_space =
          (T*)malloc(sizeof(T) * new_cap);  // recreate scratch space

      tail = size;
      head = 0;
      // since we're done with the old array... delete for memory management->

      capacity = new_cap;
    }
  }
};
// Event Types
// keyboard/mouse
// network
// dirty flag

Main. cpp:


#include <unistd.h>

#include <cstdint>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <thread>

#include "ThreadSafeQueue.h"
using namespace std;

void write_to_threadsafe_queue(ThreadSafeQueue<uint32_t> *q,
                               uint32_t startVal) {
  uint32_t count = startVal;
  while (true) {
    q->enqueue(count);
    cout << "Successfully enqueued: " << count << endl;
    count += 2;
    sleep(count);
  }
}

void sleep_and_set_redraw(int *redraw, condition_variable *cond) {
  while (true) {
    sleep(3);
    __sync_fetch_and_or(redraw, 1);
    cond->notify_one();
  }
}

void process_events(vector<uint32_t> *qOut, condition_variable *cond,
                    ThreadSafeQueue<uint32_t> *q1,
                    ThreadSafeQueue<uint32_t> *q2, int *redraw, mutex *m) {
  while (true) {
    unique_lock<mutex> lck(*m);
    cond->wait(lck);
    q1->dequeueAll(qOut);
    q2->dequeueAll(qOut);
    if (__sync_fetch_and_and(redraw, 0)) {
      cout << "FLAG SET" << endl;
      qOut->push_back(0);
    }
    for (auto a : *qOut) cout << a << "\t";
    cout << endl;
    cout << "PROCESSING: " << qOut->size() << endl;
    qOut->clear();
  }
}

void test_2_queues_and_bool() {
  try {
    condition_variable cond;
    mutex m;
    ThreadSafeQueue<uint32_t> q1(&cond, &m, 1024);
    ThreadSafeQueue<uint32_t> q2(&cond, &m, 1024);
    int redraw = 0;
    vector<uint32_t> qOut;
    thread t1(write_to_threadsafe_queue, &q1, 2);
    thread t2(write_to_threadsafe_queue, &q2, 1);
    thread t3(sleep_and_set_redraw, &redraw, &cond);
    thread t4(process_events, &qOut, &cond, &q1, &q2, &redraw, &m);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
  } catch (system_error &e) {
    cout << "MAIN TEST CRASHED" << e.what();
  }
}

int main() { test_2_queues_and_bool(); }
...