Многопоточная одиночная читательская очередь fifo - PullRequest
8 голосов
/ 19 сентября 2009

Мне нужна очередь для передачи сообщений из одного потока (A) в другой (B), однако я не смог найти тот, который действительно делает то, что я хочу, так как они обычно позволяют добавить элемент в случае неудачи, случай, который в моей ситуации это в значительной степени фатально, поскольку сообщение должно быть обработано, и поток действительно не может остановиться и ждать свободного места.

  • Только нить A добавляет элементы, а только нить B читает их
  • Поток A никогда не должен блокироваться, однако поток B не критичен к производительности, поэтому он может
  • Добавление элементов всегда должно выполняться успешно, поэтому очередь не может иметь верхнего предела размера (если не хватает памяти в системе)
  • Если очередь пуста, поток B должен подождать, пока найдется элемент для обработки

Ответы [ 4 ]

7 голосов
/ 19 сентября 2009

Вот как написать очередь без блокировки в C ++:

http://www.ddj.com/hpc-high-performance-computing/210604448

Но когда вы говорите «поток А не должен блокировать», вы уверены, что это требование? Windows не является операционной системой реального времени (как и Linux при обычном использовании). Если вы хотите, чтобы поток А мог использовать всю доступную системную память, он должен выделить память (или подождать, пока кто-то другой это сделает). Сама ОС не может обеспечить гарантии синхронизации лучше, чем те, которые были бы у вас, если бы и читатель, и писатель взяли блокировку в процессе (т. Е. Неразделенный мьютекс) для манипулирования списком. И худший вариант добавления сообщения - пойти в ОС, чтобы получить память.

Короче говоря, есть причина, по которой те очереди, которые вам не нравятся, имеют фиксированную емкость - они не должны выделять память в предположительно низко-латентном потоке.

Таким образом, код без блокировки, как правило, будет менее блочным, но из-за распределения памяти это не гарантируется, и производительность с мьютексом не должна быть такой уж плохой, если у вас действительно огромный поток событий для обработки (например, вы пишете сетевой драйвер, а сообщения являются входящими пакетами Ethernet).

Итак, в псевдокоде первое, что я попробую, будет:

Writer:
    allocate message and fill it in
    acquire lock
        append node to intrusive list
        signal condition variable
    release lock

Reader:
    for(;;)
        acquire lock
            for(;;)
                if there's a node
                    remove it
                    break
                else
                   wait on condition variable
                endif
            endfor
        release lock
        process message
        free message
    endfor

Только если это приведет к недопустимым задержкам в потоке записи, я перейду к коду без блокировки (если только у меня не оказалось подходящей очереди, уже лежащей вокруг).

1 голос
/ 20 сентября 2009
  • Почему бы не использовать STL <<code>list> или <<code>deque> с мьютексом вокруг добавления / удаления? * потокобезопасность STL недостаточна?

  • Почему бы не создать свой собственный (поодиночке / вдвойне) класс связанного узла-списка, который содержит указатель, и элементы, которые будут добавлены / удалены, наследуются от этого? Таким образом, делая дополнительное распределение ненужным. Вы просто пометите несколько указателей в threadA::add() и threadB::remove(), и все готово. (Хотя вы захотите сделать это под мьютексом, эффект блокировки на threadA будет незначительным, если вы не сделаете что-то действительно неправильное ...)

  • Если вы используете pthreads, проверьте sem_post() и sem_wait(). Идея состоит в том, что threadB может блокировать бесконечно через sem_wait(), пока threadA не поместит что-то в очередь. Затем threadA вызывает sem_post(). Который пробуждает threadB, чтобы сделать это работа. После чего поток B может вернуться в режим сна. Это эффективный способ обработки асинхронной сигнализации, поддерживающий такие вещи, как несколько threadA::add() до завершения threadB::remove().

1 голос
/ 19 сентября 2009

Visual Studio 2010 добавляет 2 новые библиотеки, которые очень хорошо поддерживают этот сценарий, Библиотека асинхронных агентов и Параллельная библиотека шаблонов.

Библиотека агентов имеет поддержку или асинхронную передачу сообщений и содержит блоки сообщений для отправки сообщений в «цели» и для получения сообщений из «источников»

unbounded_buffer - это шаблонный класс, который предлагает то, что, я полагаю, вы ищете:

#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace ::Concurrency;
using namespace ::std;

int main()
{
   //to hold our messages, the buffer is unbounded...
   unbounded_buffer<int> buf1;
   task_group tasks;

   //thread 1 sends messages to the unbounded_buffer
   //without blocking
   tasks.run([&buf1](){
      for(int i = 0 ; i < 10000; ++i)
         send(&buf1,i)
     //signal exit 
     send(&buf1,-1);
   });

   //thread 2 receives messages and blocks if there are none

   tasks.run([&buf1](){
      int result;
      while(result = receive(&buf1)!=-1)
      {
           cout << "I got a " << result << endl;
      }
   });

   //wait for the threads to end
   tasks.wait();
}
0 голосов
/ 19 сентября 2009

Возможно, вы захотите рассмотреть свои требования - действительно ли это так, что А не может вообще отказаться от каких-либо элементов очереди? Или вы не хотите, чтобы B извлекал из очереди два последовательных элемента, из которых не поступали последовательные элементы, потому что это каким-то образом исказило бы последовательность событий?

Например, если это какая-то система регистрации данных, вы (по понятным причинам) не захотите пропусков в записи - но без неограниченной памяти реальность такова, что в каком-то угловом случае вы, вероятно, могли бы переполнить емкость очереди ..

В этом случае одним из решений является наличие какого-то особого элемента, который может быть помещен в очередь, что представляет собой случай, когда А обнаружил, что ему пришлось отбросить предметы. По сути, вы сохраняете один дополнительный элемент, который в большинстве случаев равен нулю. Каждый раз, когда A идет, чтобы добавить элементы в очередь, если этот дополнительный элемент не является нулевым, это входит. Если A обнаруживает, что в очереди нет места, тогда он настраивает этот дополнительный элемент, чтобы сказать: «эй, очередь была заполнена» .

Таким образом, A никогда не блокируется, вы можете отбрасывать элементы, когда система очень занята, но вы не упускаете из виду тот факт, что элементы были отброшены, потому что, как только пространство очереди становится доступным, эта метка входит в указать, где произошла потеря данных. Затем процесс B делает все, что ему нужно, когда он обнаруживает, что он вытянул этот элемент метки переполнения из очереди.

...