связанные с многопоточностью активные вопросы проектирования объектов (повышение C ++) - PullRequest
1 голос
/ 25 ноября 2010

Я хотел бы получить отзыв о классе IService, указанном ниже. Из того, что я знаю, этот тип класса связан с шаблоном «активный объект». Пожалуйста, извините / исправьте, если я использую любую связанную терминологию неправильно. По сути, идея заключается в том, что классы, использующие этот активный объектный класс, должны предоставлять метод start и stop, который контролирует некоторый цикл событий. Этот цикл событий может быть реализован с помощью цикла while или boost asio и т. Д.

Этот класс отвечает за запуск нового потока неблокирующим образом, чтобы события могли обрабатываться в / новым потоком. Он также должен обрабатывать весь код, связанный с очисткой. Сначала я попробовал подход ОО, в котором подклассы отвечали за переопределение методов для управления циклом событий, но очистка была грязной: в деструкторе вызов метода stop приводил к чисто виртуальному вызову функции в тех случаях, когда вызывающий класс вручную не вызывал остановить метод. Шаблонное решение кажется намного чище:

template <typename T>
class IService : private boost::noncopyable
{
    typedef boost::shared_ptr<boost::thread> thread_ptr;
public:

  IService()
  {
  }

  ~IService()
  {
    /// try stop the service in case it's running
    stop();
  }

  void start()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);

    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      // already running
      return;
    }

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService::main, this)));

    // need to wait for thread to start: else if destructor is called before thread has started

    // Wait for condition to be signaled and then
    // try timed wait since the application could deadlock if the thread never starts?
    //if (m_startCondition.timed_wait(m_threadMutex, boost::posix_time::milliseconds(getServiceTimeoutMs())))
    //{
    //}
    m_startCondition.wait(m_threadMutex);

    // notify main to continue: it's blocked on the same condition var
    m_startCondition.notify_one();
  }

  void stop()
  {
    // trigger the stopping of the event loop
    m_serviceObject.stop();

    if (m_pServiceThread)
    {
      if (m_pServiceThread->joinable())
      {
        m_pServiceThread->join();
      }
      // the service is stopped so we can reset the thread
      m_pServiceThread.reset();
    }
  }

private:
  /// entry point of thread
  void main()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);
    // notify main thread that it can continue
    m_startCondition.notify_one();

    // Try Dummy wait to allow 1st thread to resume???
    m_startCondition.wait(m_threadMutex);

    // call template implementation of event loop
    m_serviceObject.start();
  }

  /// Service thread
  thread_ptr m_pServiceThread;
  /// Thread mutex
  mutable boost::mutex m_threadMutex;
  /// Condition for signaling start of thread
  boost::condition m_startCondition;

  /// T must satisfy the implicit service interface and provide a start and a stop method
  T m_serviceObject;
};

Класс можно использовать следующим образом:

class TestObject3
{
public:
  TestObject3()
      :m_work(m_ioService),
      m_timer(m_ioService, boost::posix_time::milliseconds(200))
  {
      m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error));
  }

  void start()
  {
      // simple event loop
      m_ioService.run();
  }

  void stop()
  {
      // signal end of event loop
      m_ioService.stop();
  }

  void doWork(const boost::system::error_code& e)
  {
      // Do some work here
      if (e != boost::asio::error::operation_aborted)
      {
      m_timer.expires_from_now( boost::posix_time::milliseconds(200) );
      m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error));
      }
  }

private:
  boost::asio::io_service m_ioService;
  boost::asio::io_service::work m_work;
  boost::asio::deadline_timer m_timer;
};

Теперь к моим конкретным вопросам:

1) Правильно ли используется переменная условия форсирования? Для меня это немного хакерство: я хотел дождаться запуска потока, поэтому я ждал переменную условия. Затем, после запуска нового потока в методе main, я снова жду той же переменной условия, чтобы позволить исходному потоку продолжить. Затем, после завершения метода запуска начального потока, новый поток может продолжаться. Это нормально?

2) Есть ли случаи, когда поток не будет успешно запущен ОС? Я помню, где-то читал, что это может произойти. Если это возможно, мне лучше подождать время условной переменной (как это закомментировано в методе start)?

3) Мне известно, что шаблонный класс не может правильно реализовать метод stop, т. Е. Если не удается остановить цикл обработки событий, код будет блокироваться в соединениях (либо в остановке, либо в деструкторе), но я не вижу пути обойти это. Я полагаю, что пользователь класса должен убедиться, что метод start и stop реализован правильно?

4) Буду признателен за любые другие ошибки проектирования, улучшения и т. Д.

Спасибо!

1 Ответ

0 голосов
/ 15 марта 2011

В итоге остановились на следующем:

1) После большого тестирования использование условной переменной кажется нормальным

2) Эта проблема не возникла (пока)

3) Реализация шаблонного класса должна соответствовать требованиям, для проверки корректности используются юнит-тесты

4) Улучшения

  • Добавлено соединение с блокировкой
  • Перехват исключений впорождал поток и перебрасывал в основном потоке, чтобы избежать сбоев и не потерять информацию об исключении
  • Использование boost :: system :: error_code для передачи кодов ошибок обратно вызывающей стороне
  • объект реализации доступен для установки

Код:

template <typename T>
class IService : private boost::noncopyable
{
  typedef boost::shared_ptr<boost::thread> thread_ptr;
  typedef T ServiceImpl;
public:
  typedef boost::shared_ptr<IService<T> > ptr;

  IService()
    :m_pServiceObject(&m_serviceObject)
  {
  }

  ~IService()
  {
    /// try stop the service in case it's running
    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      stop();
    }
  }

  static ptr create()
  {
    return boost::make_shared<IService<T> >();
  }

  /// Accessor to service implementation. The handle can be used to configure the implementation object
  ServiceImpl& get() { return m_serviceObject; }
  /// Mutator to service implementation. The handle can be used to configure the implementation object
  void set(ServiceImpl rServiceImpl)
  {
    // the implementation object cannot be modified once the thread has been created
    assert(m_pServiceThread == 0);
    m_serviceObject = rServiceImpl;
    m_pServiceObject = &m_serviceObject;
  }

  void set(ServiceImpl* pServiceImpl)
  {
    // the implementation object cannot be modified once the thread has been created
    assert(m_pServiceThread == 0);

    // make sure service object is valid
    if (pServiceImpl)
      m_pServiceObject = pServiceImpl; 
  }

  /// if the service implementation reports an error from the start or stop method call, it can be accessed via this method
  /// NB: only the last error can be accessed
  boost::system::error_code getServiceErrorCode() const { return m_ecService; }

  /// The join method allows the caller to block until thread completion
  void join()
  {
    // protect this method from being called twice (e.g. by user and by stop)
    boost::mutex::scoped_lock lock(m_joinMutex);
    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      m_pServiceThread->join();
      m_pServiceThread.reset();
    }
  }

  /// This method launches the non-blocking service
  boost::system::error_code start()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);

    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      // already running
      return boost::system::error_code(SHARED_INVALID_STATE, shared_category);
    }

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService2::main, this)));
    // Wait for condition to be signaled
    m_startCondition.wait(m_threadMutex);

    // notify main to continue: it's blocked on the same condition var
    m_startCondition.notify_one();
    // No error
    return boost::system::error_code();
  }

  /// This method stops the non-blocking service
  boost::system::error_code stop()
  {
    // trigger the stopping of the event loop
    //boost::system::error_code ec = m_serviceObject.stop();
    assert(m_pServiceObject);
    boost::system::error_code ec = m_pServiceObject->stop();
    if (ec)
    {
      m_ecService = ec;
      return ec;
    }

    // The service implementation can return an error code here for more information
    // However it is the responsibility of the implementation to stop the service event loop (if running)
    // Failure to do so, will result in a block
    // If this occurs in practice, we may consider a timed join?
    join();

    // If exception was thrown in new thread, rethrow it.
    // Should the template implementation class want to avoid this, it should catch the exception
    // in its start method and then return and error code instead
    if( m_exception )
      boost::rethrow_exception(m_exception);

    return ec;
  }

private:
  /// runs in it's own thread
  void main()
  {
    try
    {
      boost::mutex::scoped_lock lock(m_threadMutex);
      // notify main thread that it can continue
      m_startCondition.notify_one();
      // Try Dummy wait to allow 1st thread to resume
      m_startCondition.wait(m_threadMutex);

      // call implementation of event loop
      // This will block
      // In scenarios where the service fails to start, the implementation can return an error code
      m_ecService = m_pServiceObject->start();

      m_exception = boost::exception_ptr();
    } 
    catch (...)
    {
      m_exception = boost::current_exception();
    }
  }

  /// Service thread
  thread_ptr m_pServiceThread;
  /// Thread mutex
  mutable boost::mutex m_threadMutex;
  /// Join mutex
  mutable boost::mutex m_joinMutex;
  /// Condition for signaling start of thread
  boost::condition m_startCondition;

  /// T must satisfy the implicit service interface and provide a start and a stop method
  T m_serviceObject;
  T* m_pServiceObject;
  // Error code for service implementation errors
  boost::system::error_code m_ecService;

  // Exception ptr to transport exception across different threads
  boost::exception_ptr m_exception;
};

Дальнейшие отзывы / критика, конечно, приветствуются.

...