Каков наилучший способ обмена контейнерами данных между потоками в C ++ - PullRequest
0 голосов
/ 25 мая 2018

У меня есть приложение, которое имеет несколько уровней обработки, таких как:

InputStream->Pre-Processing->Computation->OutputStream

Каждая из этих сущностей работает в отдельном потоке.Итак, в моем коде у меня есть общий поток, которому принадлежит

std::vector<ImageRead> m_readImages;

, а затем он передает эту переменную-член в каждый поток:

InputStream input{&m_readImages};
std::thread threadStream{&InputStream::start, &InputStream};
PreProcess pre{&m_readImages};
std::thread preStream{&PreProcess::start, &PreProcess};
...

И каждый из этих классов имеет указательчлен к этим данным:

std::vector<ImageRead>* m_ptrReadImages;

У меня также определен глобальный мьютекс, который я блокирую и разблокирую при каждой операции чтения / записи в этот общий контейнер.Что меня беспокоит, так это то, что этот механизм довольно неясен, и иногда я путаюсь, используют ли данные другой поток или нет.

Так как проще разделить этот контейнер между этими потоками?

Ответы [ 6 ]

0 голосов
/ 25 мая 2018

Это проблема шаблона проектирования.Я предлагаю прочитать о шаблоне проектирования параллелизма и посмотреть, есть ли что-нибудь, что могло бы вам помочь.

Если вы хотите добавить параллелизм к следующему последовательному процессу.

InputStream->Pre-Processing->Computation->OutputStream

Тогда я предлагаюиспользовать активный шаблон проектирования объекта.Таким образом, каждый процесс не блокируется предыдущим шагом и может выполняться одновременно.Это также очень просто реализовать (Вот реализация: http://www.drdobbs.com/parallel/prefer-using-active-objects-instead-of-n/225700095)

Что касается вашего вопроса о каждом потоке, совместно использующего DTO. Это легко решается с помощью оболочки на DTO. Эта оболочка будет содержать write ифункции чтения. Функции записи блокируются с помощью мьютекста, а чтение возвращает константные данные.

Однако я думаю, что ваша проблема заключается в разработке. Если процесс последовательный, как вы описали, то почему каждый процесс совместно использует данные? Данные должны быть переданы в следующий процесс после завершения текущего. Другими словами, каждый процесс должен быть отделен.

0 голосов
/ 25 мая 2018

Вы правильно используете мьютексы и блокировки.Для C ++ 11 это действительно самый элегантный способ доступа к сложным данным между потоками.

0 голосов
/ 25 мая 2018

Игнорирование вопроса «Должна ли каждая операция выполняться в отдельном потоке», кажется, что объекты, которые вы хотите обработать, перемещаются из потока в поток.По сути, они однозначно принадлежат только одному потоку за раз (ни одному потоку никогда не требуется доступ к каким-либо данным из других потоков).Есть способ выразить это в C ++: std::unique_ptr.

Каждый шаг в этом случае работает только с собственным изображением.Все, что вам нужно сделать, - это найти поточно-ориентированный способ, позволяющий перемещать владельца ваших изображений по шагам процесса один за другим, что означает, что критические разделы находятся только на границах между задачами.Поскольку у вас их несколько, абстрагировать их было бы разумно:

class ProcessBoundary
{
public:
  void setImage(std::unique_ptr<ImageRead> newImage)
  {
    while (running)
    {
      {
        std::lock_guard<m_mutex> guard;
        if (m_imageToTransfer == nullptr)
        {
          // Image has been transferred to next step, so we can place this one here.
          m_imageToTransfer = std::move(m_newImage);
          return;
        }
      }
      std::this_thread::yield();
    }
  }

  std::unique_ptr<ImageRead> getImage()
  {
    while (running)
    {
      {
        std::lock_guard<m_mutex> guard;
        if (m_imageToTransfer != nullptr)
        {
          // Image has been transferred to next step, so we can place this one here.
          return std::move(m_imageToTransfer);
        }
      }
      std::this_thread::yield();
    }
  }

  void stop()
  {
    running = false;
  }

private:
  std::mutex m_mutex;
  std::unique_ptr<ImageRead> m_imageToTransfer;
  std::atomic<bool> running; // Set to true in constructor
};

Затем на этапах процесса запрашивается изображение с getImage(), которым они однозначно владеют, как только эта функция вернется.Они обрабатывают его и передают в setImage следующего ProcessBoundary.

Возможно, вы могли бы улучшить это с помощью условных переменных или добавить очередь в этот класс, чтобы потоки могли вернуться к обработкеследующее изображениеОднако, если некоторые шаги выполняются быстрее, чем другие, в конечном итоге они будут остановлены медленными.

0 голосов
/ 25 мая 2018

Я бы использовал 3 отдельные очереди, ready_for_preprocessing, который подается InputStream и потребляется предварительной обработкой, ready_for_computation, который подается предварительной обработкой и потребляется вычислениями, и ready_for_output, который подается вычислениями ипотребляется OutputStream.

Вы хотите, чтобы каждая очередь находилась в классе, который имеет мьютекс доступа (для управления фактическим добавлением и удалением элементов из очереди) и семафор «доступно для изображения» (чтобы сигнализировать о наличии элементов)а также фактическая очередь.Это позволило бы несколько экземпляров каждого потока.Примерно так:

class imageQueue
{
    std::deque<ImageRead> m_readImages;
    std::mutex            m_changeQueue;
    Semaphore             m_imagesAvailable;

    public:
    bool addImage( ImageRead );
    ImageRead getNextImage();

}

addImage() принимает мьютекс m_changeQueue, добавляет изображение в m_readImages, затем сигнализирует m_imagesAvailable;

getNextImage() ожидает m_imagesAvailable.Когда он получает сигнал, он берет m_changeQueue, удаляет следующее изображение из списка и возвращает его.

cf.http://en.cppreference.com/w/cpp/thread

0 голосов
/ 25 мая 2018

Мне кажется, что ваше чтение и предварительная обработка могут выполняться независимо от контейнера.

Наивно, я бы структурировал это как разветвление, а затем разветвление сети задач.

Сначала выполните диспетчеризацию задача (задача - это единица работы, которая передается потоку для фактической работы), которая будет создавать задачи ввода и предварительной обработки.

Использовать фьючерсыкак средство для подзадач для передачи указателя на полностью загруженное изображение.

Создайте вторую задачу, задачу std :: vector builder , которая просто вызывает join набудущее, чтобы получить результаты, когда они будут сделаны, и добавить их в массив std::vector.

Я предлагаю вам структурировать вещи таким образом, потому что я подозреваю, что любой ввод-вывод и предварительная обработка, которые вы делаете, займет больше времени, чем установказначение в векторе.Использование tasks вместо потоков напрямую позволяет настроить параллельную часть вашей работы.

Я надеюсь, что это не слишком отвлечено от конкретных элементов.Я считаю, что эта схема хорошо сбалансирована между насыщением доступного оборудования, снижением конкуренции за блокировку и блокировку, и понятна в будущем - вы отладите его позже.

0 голосов
/ 25 мая 2018

Процесс, который вы описали как «Ввод -> предварительная обработка -> вычисление -> Вывод», является последовательным по своей структуре: каждый шаг зависит от предыдущего, поэтому распараллеливание в этом конкретном способе не выгодно, так как каждый поток просто долженждать другого, чтобы закончить.Попробуйте выяснить, какой шаг занимает больше всего времени, и распараллелить это.Или попытайтесь настроить несколько параллельных конвейеров обработки, которые работают последовательно на независимых, отдельных наборах данных.Обычный подход для этого - использовать очередь обработки, которая распределяет задачи по набору потоков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...