Проект параллельной обработки двойной буферной системы? - PullRequest
0 голосов
/ 14 февраля 2012

У меня есть долго работающее приложение, которое в основном:

  1. считывание пакетов из сети
  2. сохранение его где-то
  3. обработка его и вывод на диск

Действительно очень распространенный вариант использования - за исключением того, что размер и скорость передачи данных могут быть довольно большими.Чтобы избежать переполнения памяти и повысить эффективность, я имею в виду схему с двумя буферами, где чередуются буферы A и B: пока A удерживает сетевой пакет, B обрабатывается для вывода.Как только буфер А достигает мягкой границы, А должен обрабатывать выходные данные, а Б будет использоваться для хранения сетевых пакетов.

Я не особенно разбираюсь в парадигме параллельной / многопоточной программы.Я читал некоторые прошлые дискуссии о циклическом буфере, который обрабатывает случай с несколькими производителями и несколькими потребителями.Я не уверен, что это лучшее решение, и кажется, что упрощает проектирование двойного буфера .

Мой вопрос: есть ли шаблон проектирования, которому я могу следовать, чтобы решить проблему?или лучший дизайн в этом отношении?Если возможно, используйте псевдокод, чтобы проиллюстрировать решение.Спасибо.

Ответы [ 3 ]

2 голосов
/ 14 февраля 2012

Я предлагаю вам вместо использования двух (или любых фиксированных количество ...) буферов просто использовать очередь и, следовательно, "производитель / потребительские отношения.

Процесс, который получает пакеты, просто добавляет их в буфер определенного размера, и, когда буфер достаточно заполнен или указанный (короткий ...) временной интервал истек, местами (не пустой) буфер в очередь для обработки другим. Затем он выделяет новый буфер для собственного использования.

Приемный («другой ...») процесс пробуждается в любое время, когда может быть новым буфером в очереди для обработки. Он удаляет буфер, обрабатывает его, затем снова проверяет очередь. Спит только тогда, когда обнаружит, что очередь пуста. (Будьте внимательны, чтобы убедиться, что процесс не может принять решение о том, чтобы заснуть в тот самый момент, когда другой процесс решит дать ему сигнал ... здесь не должно быть "состояния гонки".)

Попробуйте просто выделить хранилище «для каждого сообщения» (что бы «сообщение» не значило для вас) и поместить это «сообщение» в очередь, чтобы не было ненужной задержки при обработке, вызванной «ожиданием буфера» заполнить."

1 голос
/ 14 февраля 2012

Возможно, стоит упомянуть технику, используемую при обработке / записи звука в реальном времени, которая использует один кольцевой буфер (или fifo, если вы предпочитаете этот термин) достаточного размера.

Вам понадобится курсор для чтения и записи.(Независимо от того, нужна ли вам блокировка или вы можете сделать это с помощью энергозависимых барьеров плюс барьеры памяти, это очень сложный вопрос, но люди из portaudio советуют вам делать это без блокировок, если важна производительность.)

Вы можете использовать один поток для чтенияи еще один поток, чтобы написать.Поток чтения должен использовать как можно больше буфера.Вы будете в безопасности, если не исчерпаете буферное пространство, но оно существует и для решения с двумя буферами.Таким образом, основное предположение заключается в том, что вы можете записывать на диск быстрее, чем вводить данные, или вам нужно будет расширить решение.

0 голосов
/ 14 февраля 2012

Найдите класс очереди производитель-потребитель, который работает. Используйте один для создания пула буферов, чтобы улучшить производительность и контролировать использование памяти. Используйте другой для переноса буферов из сетевого потока в поток диска:

#define CnumBuffs 128
#define CbufSize  8182
#define CcacheLineSize 128

    public class netBuf{
      private char cacheLineFiller[CcacheLineSize];  // anti false-sharing space
      public int dataLen;
      public char bigBuf[CbufSize];
    };

    PCqueue pool;
    PCqueue diskQueue;
    netThread Thread;
    diskThread Thread;

    pool=new(PCqueue);
    diskQueue=new(PCqueue);

    // make an object pool
    for(i=0;i<CnumBuffs,i++){
      pool->push(new(netBuf));
    };

    netThread=new(netThread);
    diskThread=new(diskThread);
    netThread->start();
    diskThread->start();
    ..

    void* netThread.run{
      netbuf *thisBuf;
      for(;;){
        pool->pop(&thisBuf};  // blocks if pool empty
        netBuf->datalen=network.read(&thisBuf.bigBuf,sizeof(thisBuf.bigBuf));
        diskQueue->push(thisBuf);
      };
    };

    void* diskThread.run{
      fileStream *myFile;
      diskBuf *thisBuf;
      new myFile("someFolder\fileSpec",someEnumWrite);
      for(;;){
        diskQueue->pop(&thisBuf};  // blocks until buffer available
        myFile.write(&thisBuf.bigBuf,thisBuf.dataLen);
        pool->push(thisBuf};
      };
    };
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...