Ошибка при создании потока с прерыванием - PullRequest
0 голосов
/ 22 марта 2020

Я хочу создать поток, который может быть прерван во время ожидания (он ожидает данные от других процессов, и я хочу остановить процесс хорошим способом).

Я прочитал 9.2 часть Параллелизм C ++ в действии, 2-е издание , и я пытался реализовать эти идеи, но у меня есть некоторые проблемы, и я не знаю, где проверить.

Это мой код, основанный на этом пример:

#include <iostream>
#include <stdexcept>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <future>

// Exception that should be raised when there's an interruption.
// It's raised when the thread is interrupted, so we can catch
// it and finish the thread execution.
class InterruptedException : public std::runtime_error {
public:
  InterruptedException(const std::string& message) : std::runtime_error(message) {}
  virtual ~InterruptedException() {}
};

// Interrupt flag. This class represents a local-thread flag that
// tells if the thread is interrupted or not.
class InterruptFlag {
public:

  InterruptFlag() :
    m_threadConditionVariable(nullptr),
    m_threadConditionVariableAny(nullptr) {}

  void set() {
    m_flag.store(true, std::memory_order_relaxed);
    std::lock_guard<std::mutex> lk(m_setClearMutex);
    if (m_threadConditionVariable) {
      m_threadConditionVariable->notify_all();
    }
    else if (m_threadConditionVariableAny) {
      m_threadConditionVariableAny->notify_all();
    }
  }

  template <typename Lockable>
  void wait(std::condition_variable_any& cv, Lockable& lk) {
    struct CustomLock {
      InterruptFlag* m_self;
      Lockable& m_lk;

      CustomLock(InterruptFlag* self, std::condition_variable_any& cond, Lockable& lk) :
        m_self(self),
        m_lk(lk) {
        m_self->m_setClearMutex.unlock();
        m_self->m_threadConditionVariableAny = &cond;
      }

      void unlock() {
        m_lk.unlock();
        m_self->m_setClearMutex.unlock();
      }

      void lock() {
        std::lock(m_self->m_setClearMutex, lk);
      }

      ~CustomLock() {
        m_self->m_threadConditionAny = nullptr;
        m_self->m_setClearMutex.unlock();
      }
    };

    CustomLock cl(this, cv, lk);
    InterruptPoint();
    cv.wait(cl);
    InterruptPoint();
  }

  void setConditionVariable(std::condition_variable& cv) {
    std::lock_guard<std::mutex> lk(m_setClearMutex);
    m_threadConditionVariable = &cv;
  }

  void clearConditionVariable() {
    std::lock_guard<std::mutex> lk(m_setClearMutex);
    m_threadConditionVariable = nullptr;
  }

  bool isSet() const {
    return m_flag.load(std::memory_order_relaxed);
  }

private:

  std::atomic<bool> m_flag;
  std::condition_variable* m_threadConditionVariable;
  std::condition_variable_any* m_threadConditionVariableAny;
  std::mutex m_setClearMutex;
};

// Thread-local interrupt flag instance. The variable should be
// created for every thread, since it's thread_local.
thread_local InterruptFlag ThisThreadInterruptFlag;

// Convenience class for cleaning the flag due to RAII.
struct ClearConditionVariableOnDestruct {
  ~ClearConditionVariableOnDestruct() {
    ThisThreadInterruptFlag.clearConditionVariable();
  }
};

// Function that throws the exception that tells that the thread
// is interrupted. For doing it checks the state of ThisThreadInterruptFlag.
void InterruptionPoint() {
  if (ThisThreadInterruptFlag.isSet()) {
    throw InterruptedException("Interrupted");
  }
}

// Function that must be used inside the thread function body for waiting.
// It waits for the condition variable, when it notifies from other threads,
// but it also notifies if the thread is interrupted.
void InterruptibleWait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk) {
  InterruptionPoint();
  ThisThreadInterruptFlag.setConditionVariable(cv);
  ClearConditionVariableOnDestruct guard;
  InterruptionPoint();
  cv.wait_for(lk, std::chrono::milliseconds(1));
  InterruptionPoint();
}

// This class represents the interruptible thread. It adds a interrupt()
// method that when called interupts the thread execution, if it's waiting
// at some point where InterruptibleWait function is locked.
class Interruptible {
public:

  template <typename FunctionType>
  Interruptible(FunctionType f) {
    std::promise<InterruptFlag*> p;
    m_internalThread = std::thread([f, &p]() {
      p.set_value(&ThisThreadInterruptFlag);
      try {
        f();
      }
      catch (InterruptedException) {

      }
      });
    m_flag = p.get_future().get();
  }


  void join() {
    m_internalThread.join();
  }

  void detach() {
    m_internalThread.detach();
  }

  bool joinable() const {
    return m_internalThread.joinable();
  }

  void interrupt() {
    if (m_flag) {
      m_flag->set();
    }
  }

private:

  std::thread m_internalThread;
  InterruptFlag* m_flag;
};

std::mutex mtx;
std::unique_lock<std::mutex> lk(mtx);

int main(int argc, char* argv[]) {
  std::cout << "Interrupting thread example" << std::endl;
  bool test = false;
  std::condition_variable cv;
  auto f = [&cv, &test]() {
    test = true;
    InterruptibleWait(cv, lk);
    // Since it locks forever, it should never reach this point.
    test = false;
  };
  Interruptible interruptibleThread(f);
  std::this_thread::sleep_for(std::chrono::milliseconds(30));
  // We interrupt the function while it's blocked in InterruptibleWait
  interruptibleThread.interrupt();
  interruptibleThread.join();
  std::cout << "test value is " << std::boolalpha << test << ". It should be true." << std::endl;
  return 0;
}

По сути, я создаю класс Interruptible, представляющий поток, который может быть прерван. Я прерываю его во время выполнения, вызывая его метод interrupt(). Поток может быть прерван, если он заблокирован вызовом функции InterruptibleWait. Эта функция ведет себя как std::condition.wait(), на самом деле ей нужна ссылка на нее, но она также обрабатывает флаг прерывания.

Если я запускаю программу. Я получаю сообщение об ошибке в Visual Studio при запуске.

Я не знаю, что я делаю неправильно. Что я должен сделать, чтобы заставить InterruptibleWait работать правильно?

1 Ответ

1 голос
/ 22 марта 2020

Моя лучшая догадка на основании предоставленной информации:

Исключение не перехватывается в функции точки входа потока и выходит из этой функции. Когда это происходит в потоке, запущенном std :: thread, для вас вызывается abort (косвенно через std :: terminate) реализацией std :: thread, как того требует стандарт. Чтобы это исправить, попробуйте перехватить все исключения в функции, передаваемой в std :: thread.

См. Статьи cppreference по std :: thread и std :: terminate

...