потребитель / производитель в c ++ - PullRequest
3 голосов
/ 22 февраля 2012

Это классическая проблема c / p, когда некоторые потоки производят данные, а другие читают данные.Как производитель, так и потребители используют общий размер буфера.Если буфер пуст, то потребители должны ждать, а если он заполнен, то производитель должен ждать.Я использую семафоры для отслеживания полных или пустых очередей.Производитель собирается уменьшить семафор свободных пятен, добавить значение и увеличить семафор заполненных слотов.Поэтому я пытаюсь реализовать программу, которая получает некоторые числа из функции генератора, а затем выводит среднее число.Рассматривая это как проблему производителя-потребителя, я пытаюсь сэкономить некоторое время при выполнении программы.Функция generateNumber вызывает некоторую задержку в процессе, поэтому я хочу создать ряд потоков, которые генерируют числа, и поместить их в очередь.Затем «главный поток», который выполняет основную функцию, должен прочитать из очереди и найти сумму, а затем усреднить.Итак, вот что у меня есть:

#include <cstdio> 
#include <cstdlib>
#include <time.h>
#include "Thread.h" 
#include <queue> 

int generateNumber() {
    int delayms = rand() / (float) RAND_MAX * 400.f + 200;
    int result = rand() / (float) RAND_MAX * 20;
    struct timespec ts;
    ts.tv_sec = 0;
    ts.tv_nsec = delayms * 1000000;
    nanosleep(&ts, NULL);
    return result; }


struct threadarg {
    Semaphore filled(0);
    Semaphore empty(n);
    std::queue<int> q; };


void* threadfunc(void *arg) {
    threadarg *targp = (threadarg *) arg;
    threadarg &targ = *targp;
    while (targ.empty.value() != 0) {
        int val = generateNumber();
        targ.empty.dec(); 
        q.push_back(val);
        targ.filled.inc(); }
}
int main(int argc, char **argv) {
    Thread consumer, producer;
    // read the command line arguments
    if (argc != 2) {
        printf("usage: %s [nums to average]\n", argv[0]);
        exit(1); }
    int n = atoi(argv[1]);
    // Seed random number generator
    srand(time(NULL));
}

Я немного запутался, потому что я не уверен, как создать несколько потоков производителей, которые генерируют числа (если q не заполнено), пока потребительчтение из очереди (то есть, если q не пусто).Я не уверен, что поставить в главном, чтобы это произошло.также в «Thread.h» вы можете создать поток, мьютекс или семафор.Поток имеет методы .run (threadFunc, arg), .join () и т. Д. Мьютекс может быть заблокирован или разблокирован.Все методы семафора были использованы в моем коде.

Ответы [ 4 ]

4 голосов
/ 22 февраля 2012

Ваша очередь не синхронизирована, поэтому несколько производителей могут звонить push_back одновременно или одновременно, когда потребитель звонит pop_front ... это прервется.

Простой подход к выполнению этой работы состоит в том, чтобы использовать потокобезопасную очередь, которая может быть оберткой вокруг std::queue, который у вас уже есть, плюс мьютекс.

Вы можете начать с добавления мьютекса и блокировки / разблокировки его при каждом переадресации вызова на std::queue - для одного потребителя, которого должно быть достаточно, для нескольких потребителей вам необходимо объединить front() и * 1010. * в один синхронизированный вызов.

Чтобы разрешить блокировку потребителя, пока очередь пуста, вы можете добавить переменную условия в свою оболочку.

Этого должно быть достаточно, чтобы найти ответ в Интернете - образец кода ниже.


template <typename T> class SynchronizedQueue
{
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable condvar_;

    typedef std::lock_guard<std::mutex> lock;
    typedef std::unique_lock<std::mutex> ulock;

public:
    void push(T const &val)
    {
        lock l(mutex_); // prevents multiple pushes corrupting queue_
        bool wake = queue_.empty(); // we may need to wake consumer
        queue_.push(val);
        if (wake) condvar_.notify_one();
    }

    T pop()
    {
        ulock u(mutex_);
        while (queue_.empty())
            condvar_.wait(u);
        // now queue_ is non-empty and we still have the lock
        T retval = queue_.front();
        queue_.pop();
        return retval;
    }
};

Замените std::mutex и др. Теми примитивами, которые дает вам ваш "Thread.h".

1 голос
/ 22 февраля 2012

Что бы я сделал, это:

  • Создайте класс данных, который скрывает вашу очередь
  • Создайте потокобезопасные методы доступа для сохранения фрагмента данных в q, иудаление части данных из q (я бы использовал один мьютекс или критическую секцию для аксессоров)
  • Обработайте случай, когда у потребителя нет данных для работы (сон)
  • Обработать случай, когда q становится слишком полным, и производителям нужно замедлить
  • Пусть потоки идут волей-неволей, добавляя и удаляя по мере их производства / потребления

Кроме того, не забудьте добавить спящий режим в каждый поток, иначе вы будете привязывать ЦП и не обеспечите планировщику потоков хорошее место для переключения контекстов и совместного использования ЦП с другими потоками / процессами.Вам не нужно, но это хорошая практика.

0 голосов
/ 22 февраля 2012

Защитите доступ к очереди с помощью мьютекса, который должен быть этим. Ограниченная очередь «Computer Science 101» для производителя-потребителя требует двух семафоров (для управления подсчетом свободного / пустого пространства и ожиданиям производителей / потребителей, как вы это уже делаете) и одного mutex / futex / критического сегмента для защиты очереди .

Я не понимаю, как замена семафоров и мьютексов на condvars - это большая помощь. В чем смысл? Как реализовать ограниченную очередь производителей и потребителей с condvars, которая работает на всех платформах с несколькими производителями / потребителями?

0 голосов
/ 22 февраля 2012

При управлении таким общим состоянием вам нужны переменная условия и мьютекс.Базовым шаблоном является функция, которая выглядит следующим образом:

ScopedLock l( theMutex );
while ( !conditionMet ) {
    theCondition.wait( theMutex );
}
doWhatever();
theCondition.notify();

В вашем случае, я бы, вероятно, сделал условную переменную и члены мьютекса класса, реализующие очередь.Чтобы написать, conditionMet будет !queue.full(), так что вы получите что-то вроде:

ScopedLock l( queue.myMutex );
while ( queue.full() ) {
    queue.myCondition.wait();
}
queue.insert( whatever );
queue.myCondition.notify();

и читать:

ScopedLock l( queue.myMutex );
while ( queue.empty() ) {
    queue.myCondition.wait();
}
results = queue.extract();
queue.myCondition.notify();
return results;

В зависимости от интерфейса потоков, может быть две функции notify: уведомить одну (которая пробуждает один поток) и уведомить всех (которая пробуждает все ожидающие потоки);в этом случае вам нужно будет уведомить всех (или вам понадобятся две условные переменные, одна для пробела для записи и одна для чего-то для чтения, причем каждая функция ожидает одной, но уведомляет другую).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...