Как я могу синхронизировать три потока? - PullRequest
1 голос
/ 14 октября 2010

Мое приложение состоит из основного процесса и двух потоков, все из которых работают одновременно и используют три очереди fifo:

Five-q - это Qmain, Q1 и Q2.Внутри каждой очереди используется счетчик, который увеличивается, когда элемент помещается в очередь, и уменьшается, когда элемент «извлекается» из очереди.

В обработке участвуют два потока,QMaster, который получают из Q1 и Q2 и помещают в Qmain,Монитор, который положил в Q2,и основной процесс, который получает из Qmain и помещает в Q1.

Цикл QMaster-thread последовательно проверяет счетчики Q1 и Q2 и, если какие-либо элементы находятся в q, он получает их и помещает их в Qmain.

Цикл Monitor-thread получает данные из внешних источников, упаковывает их и помещает в Q2.

Основной процесс приложения также запускает цикл, проверяющий счетчик Qmain, иесли есть какие-либо элементы, получайте элемент из Qmain на каждой итерации цикла и обрабатывайте его дальше.Во время этой обработки он иногда помещает элемент в Q1 для последующей обработки (когда он по очереди получает от Qmain).

Проблема:Я реализовал все как описано выше, и он работает в течение случайного (короткого) времени, а затем зависает.Мне удалось определить причину сбоя в увеличении / уменьшении счетчика fifo-q (это может произойти в любом из них).

Что я пробовал:Используя три мьютекса: QMAIN_LOCK, Q1_LOCK и Q2_LOCK, которые я блокирую всякий раз, когда любая операция get / put выполняется на соответствующем fifo-q.Результат: приложение не запускается, просто зависает.

Основной процесс должен постоянно работать, не должен блокироваться при чтении (сбой именованных каналов, сбой сокета пары).

Есть совет?Я думаю, что неправильно внедряю мьютекс, как это должно быть сделано?(Любые комментарии по улучшению вышеуказанного дизайна также приветствуются)

[править] ниже приведены процессы и шаблон fifo-q:Где и как в этом месте разместить мьютекс, чтобы избежать проблем, описанных выше?

main-process:
...
start thread QMaster
start thread Monitor
...
while (!quit)
{
    ...
    if (Qmain.count() > 0)
    {
        X = Qmain.get();
        process(X) 
            delete X;
    }
    ...
    //at some random time:
    Q2.put(Y);
    ...
}

Monitor:
{
    while (1)
    {
        //obtain & package data
        Q2.put(data)
    }
}

QMaster:
{
    while(1)
    {
        if (Q1.count() > 0)
            Qmain.put(Q1.get());

        if (Q2.count() > 0)
            Qmain.put(Q2.get());
    }
}

fifo_q:
template < class X* > class fifo_q
{
    struct item
    {
        X* data;
        item *next;
        item() { data=NULL; next=NULL; }
    }
    item *head, *tail;
    int count;
public:
    fifo_q() { head=tail=NULL; count=0; }
    ~fifo_q() { clear(); /*deletes all items*/ }
    void put(X x) { item i=new item(); (... adds to tail...); count++; }
    X* get() { X *d = h.data; (...deletes head ...); count--; return d; }
    clear() {...}
};

Ответы [ 6 ]

1 голос
/ 14 октября 2010

Пример того, как бы я адаптировал дизайн и заблокировал доступ к очереди способом posix. Заметьте, что я бы обернул мьютекс, чтобы использовать RAII или использовал boost-threading, и что я бы использовал stl :: deque или stl :: queue в качестве очереди, но оставаясь как можно ближе к вашему коду:

main-process:
...
start thread Monitor
...
while (!quit)
{
    ...
    if (Qmain.count() > 0)
    {
        X = Qmain.get();
        process(X) 
            delete X;
    }
    ...
    //at some random time:
    QMain.put(Y);
    ...
}

Monitor:
{
    while (1)
    {
        //obtain & package data
        QMain.put(data)
    }
}

fifo_q:
template < class X* > class fifo_q
{
    struct item
    {
        X* data;
        item *next;
        item() { data=NULL; next=NULL; }
    }
    item *head, *tail;
    int count;
    pthread_mutex_t m;
public:
    fifo_q() { head=tail=NULL; count=0; }
    ~fifo_q() { clear(); /*deletes all items*/ }
    void put(X x) 
    { 
      pthread_mutex_lock(&m);
      item i=new item(); 
      (... adds to tail...); 
      count++; 
      pthread_mutex_unlock(&m);
    }
    X* get() 
    { 
      pthread_mutex_lock(&m);
      X *d = h.data; 
      (...deletes head ...); 
      count--; 
      pthread_mutex_unlock(&m);
      return d; 
    }
    clear() {...}
};

Также отметим, что мьютекс все еще нужно инициализировать, как в примере здесь , и что count () также должен использовать мьютекс

1 голос
/ 14 октября 2010

Я написал простое приложение ниже:

#include <queue>
#include <windows.h>
#include <process.h>
using namespace std;

queue<int> QMain, Q1, Q2;
CRITICAL_SECTION csMain, cs1, cs2;

unsigned  __stdcall TMaster(void*)
{
    while(1)
    {
        if( Q1.size() > 0)
        {
            ::EnterCriticalSection(&cs1);
            ::EnterCriticalSection(&csMain);
            int i1 = Q1.front();
            Q1.pop();
            //use i1;
            i1 = 2 * i1;
            //end use;
            QMain.push(i1);
            ::LeaveCriticalSection(&csMain);
            ::LeaveCriticalSection(&cs1);
        }
        if( Q2.size() > 0)
        {
            ::EnterCriticalSection(&cs2);
            ::EnterCriticalSection(&csMain);
            int i1 = Q2.front();
            Q2.pop();
            //use i1;
            i1 = 3 * i1;
            //end use;
            QMain.push(i1);
            ::LeaveCriticalSection(&csMain);
            ::LeaveCriticalSection(&cs2);
        }
    }
    return 0;
}

unsigned  __stdcall TMoniter(void*)
{
    while(1)
    {
        int irand = ::rand();
        if ( irand % 6 >= 3)
        {
            ::EnterCriticalSection(&cs2);
            Q2.push(irand % 6);
            ::LeaveCriticalSection(&cs2);
        }
    }
    return 0;
}

unsigned  __stdcall TMain(void)
{
    while(1)
    {
        if (QMain.size() > 0)
        {
            ::EnterCriticalSection(&cs1);
            ::EnterCriticalSection(&csMain);
            int i = QMain.front();
            QMain.pop();
            i = 4 * i;
            Q1.push(i);
            ::LeaveCriticalSection(&csMain);
            ::LeaveCriticalSection(&cs1);
        }
    }
    return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
    ::InitializeCriticalSection(&cs1);
    ::InitializeCriticalSection(&cs2);
    ::InitializeCriticalSection(&csMain);
    unsigned threadID;
    ::_beginthreadex(NULL, 0, &TMaster, NULL, 0, &threadID);
    ::_beginthreadex(NULL, 0, &TMoniter, NULL, 0, &threadID);
    TMain();

    return 0;
}
1 голос
/ 14 октября 2010

Используйте отладчик. Когда ваше решение с мьютексами зависнет, посмотрите, что делают потоки, и вы получите хорошее представление о причине проблемы.

Какая у вас платформа? В Unix / Linux вы можете использовать очереди сообщений POSIX (вы также можете использовать очереди сообщений System V, сокеты, FIFO, ...), так что вам не нужны мьютексы.

Узнайте о условных переменных. По вашему описанию это выглядит так, как будто ваш Qmaster-поток занят зацикливанием и сжиганием вашего процессора.

Один из ваших ответов предполагает, что вы делаете что-то вроде:

Q2_mutex.lock()
Qmain_mutex.lock()
Qmain.put(Q2.get())
Qmain_mutex.unlock()
Q2_mutex.unlock()

но вы, вероятно, хотите сделать это так:

Q2_mutex.lock()
X = Q2.get()
Q2_mutex.unlock()

Qmain_mutex.lock()
Qmain.put(X)
Qmain_mutex.unlock()

и, как предположил Грегори, инкапсулируйте логику в get / put.

РЕДАКТИРОВАТЬ: Теперь, когда вы разместили свой код, мне интересно, это учебное упражнение? Потому что я вижу, что вы кодируете свой собственный класс очереди FIFO вместо того, чтобы использовать стандартный std :: queue C ++. Я полагаю, вы очень хорошо протестировали свой класс, и проблема не в этом.

Кроме того, я не понимаю, зачем вам три разные очереди. Кажется, что очереди Qmain будет достаточно, и тогда вам не понадобится поток Qmaster, который действительно занят ожиданием.

Что касается инкапсуляции, вы можете создать класс synch_fifo_q, который инкапсулирует класс fifo_q. Добавьте приватную переменную mutex, а затем открытые методы (put, get, clear, count, ...) должны быть похожи на put (X) {lock m_mutex; m_fifo_q.put (Х); разблокировать m_mutex; }

вопрос: что произойдет, если у вас будет несколько читателей из очереди? Гарантируется ли, что после «count ()> 0» вы можете выполнить «get ()» и получить элемент?

1 голос
/ 14 октября 2010

Вы не должны блокировать второй мьютекс, если он уже заблокирован.

Поскольку вопрос помечен C ++, я предлагаю реализовать блокировку внутри логики get / add класса очереди (например, с помощью форсированных блокировок) или написать оболочку, если ваша очередь не является классом.

Это позволяет упростить логику блокировки.

Относительно добавленных вами источников: проверка размера очереди и последующие операции put / get должны выполняться в одной транзакции, в противном случае другой поток может редактировать очередь в диапазоне

0 голосов
/ 14 октября 2010

1 может возникнуть проблема из-за этого правила «Основной процесс должен продолжаться все время, не должен блокироваться при чтении».Как вы это реализовали?В чем разница между «get» и «read»?

Кажется, проблема в вашей реализации, а не в логике.И, как вы заявили, вы не должны находиться в тупиковой блокировке, потому что вы не получаете другую блокировку, будь то в блокировке.

0 голосов
/ 14 октября 2010

Приобретаете ли вы несколько замков одновременно? Как правило, это то, что вы хотите избежать. Если необходимо, убедитесь, что вы всегда получаете блокировки в одном и том же порядке в каждом потоке (это более ограничивает ваш параллелизм и почему вы вообще хотите его избегать).

Другие рекомендации по параллелизму: Получаете ли вы блокировку до чтения размеров очереди? Если вы используете мьютекс для защиты очередей, то реализация вашей очереди не является параллельной, и вам, вероятно, необходимо получить блокировку перед чтением размера очереди.

...