Как я могу получить элементы из ограниченного буфера в том порядке, в котором они были изначально созданы в C ++? - PullRequest
0 голосов
/ 26 ноября 2018

У меня довольно типичная проблема производителя / потребителя, которую я решил с помощью ограниченного буфера .Один процесс генерирует элементы и передает их N рабочим потокам.Рабочие потоки обрабатывают эти элементы и помещают результаты в ограниченный буфер.Конечный потребительский процесс извлекает готовые элементы из буфера.Следующая диаграмма потока данных иллюстрирует:

enter image description here

Каждому рабочему требуется переменное количество времени для обработки его элемента, поэтому рабочие вставляют готовые элементы в ограниченнуюбуфер в по существу случайном порядке.Это работает достаточно хорошо, но иногда необходимо получить готовые элементы в том же порядке , в котором они были изначально созданы.Итак, вопрос:

Как я могу изменить мою существующую реализацию для извлечения готовых элементов по порядку?

Важным дополнительным ограничением является то, что мы должны соблюдать размерограниченный буфер.Если буфер имеет размер M, то мы не можем иметь больше чем M готовых элементов, ожидающих потребителя в любой момент времени.

Ограниченный буфер

Ограниченный буферимеет простой интерфейс:

template <class T> class bounded_buffer
{
public:
  // initializes a new buffer
  explicit bounded_buffer(size_t capacity);
  // pushes an item into the buffer, blocks if full
  void push(T item);
  // pops an item from the buffer, blocks if empty
  T pop();
};

Обработка элемента

Рабочие потоки используют следующий код для обработки элемента:

std::unique_lock guard{ source_lock };
auto item = GetNextItem();
guard.unlock();

buffer.push(ProcessItem(std::move(item)));

(Фактический код является немного более сложным, поскольку он должен обрабатывать конец входных данных, ошибки отмены и обработки. Но эти детали не имеют отношения к вопросу.)

Код для получения законченногоэлемент просто выбрасывает ограниченный буфер:

auto processed_item = buffer.pop();

Ответы [ 2 ]

0 голосов
/ 27 ноября 2018

Вот мое предложение:

  1. Добавьте в класс WorkItem поле "Требуется".Это поле будет иметь тип shared_ptr<WorkItem> (или аналогичный).Если значение не равно NULL, в этом поле указывается зависимость между двумя WorkItem с - например, если для поля обязательно WorkItem B задано значение WorkItem A, это означает, что процессу потребителя необходимопотреблять A до B.

  2. Также добавлять к каждому WorkItem a condition variable (и связанным с ним mutex)

  3. Также добавьте к каждому WorkItem логическое «потребляемое» поле.Это поле по умолчанию будет false, однако, когда процесс потребителя потребляет WorkItem, он заблокирует WorkItem mutex, установит для этого поля значение true, вызовет notify_all() на WorkItem s condition variable, а затем разблокируйте mutex.

  4. Когда рабочий процесс завершает обработку WorkItem, он должен проверить поле «require»WorkItem.Если поле «require» имеет значение NULL, WorkItem может быть добавлено в ограниченную очередь немедленно, и задание рабочего процесса выполнено.

  5. В противном случае рабочий-process должен заблокировать mutex из поля «require» WorkItem и проверить его «потребляемую» переменную - если он установлен на true, рабочий процесс должен разблокировать mutex и поставить в очередь ссылку WorkItem, и его работа выполнена.

  6. Если мы попали сюда, то рабочий процесс не может поставить в очередь свой WorkItem, потому что его WorkItem имеет зависимость упорядочения, которая предотвращаетЭто.Таким образом, в этом случае рабочий процесс должен вызвать wait() на condition variable своей зависимости.Это переведет рабочий процесс в спящий режим до тех пор, пока его «require» - WorkItem не будет израсходован - в этот момент рабочий процесс проснется (через вызов notify_all() на шаге 3) и сможет поставить в очередь свой собственный WorkItem как обычно.

Эта логика должна быть достаточной для обеспечения правильного упорядочения при указании, при этом позволяя рабочим процессам работать максимально эффективно на WorkItems, которые неесть требования заказа потребления.

0 голосов
/ 26 ноября 2018

Я представлю два решения.Первый быстрый и простой.Вторая основывается на идее, лежащей в основе первой, чтобы получить что-то более эффективное.

Первый подход: std :: future

Основная идея заключается в том, что мы будем "зарезервировать пробел в ограниченном буфере, когда мы сначала получим значение и заполним его, когда закончим обработку элемента.std::future предоставляет готовый механизм для достижения этой цели.Вместо использования bounded_buffer<T> мы будем использовать bounded_buffer<std::future<T>>.Мы настраиваем рабочий код следующим образом:

std::unique_lock guard{ source_lock };
auto item = GetNextItem();    
std::promise<T> processed_item;
buffer.push(processed_item.get_future());
guard.unlock();

processed_item.set_value(ProcessItem(std::move(item)));

Затем настраиваем потребительский код одним касанием, чтобы извлечь значение из будущего:

auto processed_item = buffer.pop().get();

Если потребительский процесс получает элементдо того, как рабочий закончил с этим, тогда std::future<T>::get будет гарантировать, что потребитель блокирует, пока элемент не будет готов.

Плюсы:

  • Относительно просто, и это решает проблему.Мы помещаем фьючерсы в ограниченный буфер, удерживая блокировку источника, так что это гарантирует, что окончательные результаты поступают в буфер в том же порядке, в котором мы их извлекали из источника.
  • Не требует каких-либо изменений в ограниченном буфересам, сохраняя чистоту этой абстракции.

Минусы:

  • std::future относительно тяжелый, требующий дополнительного выделения памяти и внутренней синхронизации.
  • Теперь мы удерживаем блокировку источника, пока мы нажимаем в буфер (и нажатие может блокировать);это, вероятно, хорошо, но потенциально проблематично, если GetNextItem() стоит дорого.

Второй подход: создать лучший буфер

Для решения проблем производительности в первомподход, мы можем настроить реализацию ограниченного буфера, чтобы встроить идею резервирования пространства в нем.Мы сделаем три изменения в его интерфейсе:

  1. Изменим конструктор для принятия предиката.
  2. Изменим метод push, чтобы вернуть локатор.
  3. Добавить новыйreplace метод, который принимает локатор и значение.

Модифицированный интерфейс выглядит следующим образом:

template <class T, class P> class bounded_buffer
{
public:
  using locator_type = /* unspecified */;
  // initializes a new buffer; an item is "available" if and only if it
  // satisfies this predicate
  explicit bounded_buffer(size_t capacity, P predicate);
  // pushes an item into the buffer, blocks if full; the buffer's count of
  // available items will increase by one if and only if all items in the
  // buffer (including the new one) are available
  locator_type push(T item);
  // pops an item from the buffer, blocks if empty
  T pop();
  // replaces an existing item in the buffer; if the item is the first in the
  // buffer, then we set the count of available items as follows: 0 if the
  // item is unavailable, or X if it is available where X is the number of 
  // available items at the front of the buffer
  void replace(locator_type location, T item);
};

Затем мы меняем тип, хранящийся в ограниченном буфере, с T до std::variant<std::monostate, T>.Предикат будет считать элемент «доступным», если он содержит T. Мы изменим рабочий код следующим образом:

std::unique_lock guard{ source_lock };
auto item = GetNextItem();      
auto location = buffer.push(std::monostate{});
guard.unlock();

buffer.replace(location, ProcessItem(std::move(item));

Код извлечения в потребителе также должен измениться, чтобы извлечь значение из варианта:

auto processed_item = std::get<1>(buffer.pop());

Плюсы:

  • Более легкий и, следовательно, более производительный, чем подход std::future.(Для хранения признаков std::variant требуется лишь немного больше памяти, чем в исходной версии.)
  • Решает проблему в основном так же, как и версия future.

Минусы:

  • Требуются изменения в реализации ограниченного буфера, и его базовые операции больше не соответствуют ожидаемым для этой абстракции.
  • Не устраняет выявленную проблему блокировки источникавыше.

Обработка ошибок

Я упустил обработку ошибок для простоты.Тем не менее, для обоих подходов требуется правильная обработка исключений.Если во время обработки элемента с написанным кодом возникнет исключительная ситуация, потребитель будет зависать, потому что он будет ожидать зарезервированного элемента, который никогда не будет доставлен.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...