Многопоточность - PullRequest
       5

Многопоточность

3 голосов
/ 30 июня 2011

Я только начал изучать многопоточность. Я написал простое приложение. Приложение создает три потока. Два потока пишут и один поток читает. Потоки записи пишут в отдельное место в глобальном массиве. Поток писателя после увеличения значения в массиве уведомляет читателя. Затем поток чтения уменьшает это значение в массиве и снова ждет, когда потоки записи обновят свое соответствующее значение в массиве. Код для приложения вставлен ниже.

Что я вижу, так это то, что потоки писателя (продюсера) получают больше времени, чем поток читателя (потребителя). Я думаю, что я делаю что-то не так. Если выходные данные приложения перенаправляются в файл, то можно заметить, что поступает больше последовательных сообщений от производителей, а сообщения от потребителя происходят нечасто. Я ожидал, что, когда производитель обновляет свои данные, потребитель сразу же обрабатывает их, то есть после каждого сообщения источника должно быть напечатано сообщение потребителя.

Спасибо и всего наилучшего,

~ подключи

#include <stdio.h>

#include <pthread.h>

const long g_lProducerCount = 2; /*Number of Producers*/

long g_lProducerIds[2]; /*Producer IDs = 0, 1...*/
long g_lDataArray[2]; /*Data[0] for Producer 0, Data[1] for Producer 1...*/

/*Producer ID that updated the Data. -1 = No update*/
long g_lChangedProducerId = -1;

pthread_cond_t g_CondVar = PTHREAD_COND_INITIALIZER;
pthread_mutex_t g_Mutex  = PTHREAD_MUTEX_INITIALIZER;
pthread_t g_iThreadIds[3]; /*3 = 2 Producers + 1 Consumer*/

unsigned char g_bExit = 0; /*Exit application? 0 = No*/

void* Producer(void *pvData)
{
    long lProducerId = *(long*)pvData; /*ID of this Producer*/

    while(0 == g_bExit) {
        pthread_mutex_lock(&g_Mutex);

        /*Tell the Consumer who's Data is updated*/
        g_lChangedProducerId = lProducerId;

        /*Update the Data i.e. Increment*/
        ++g_lDataArray[lProducerId];

        printf("Producer: Data[%ld] = %ld\n",
                lProducerId, g_lDataArray[lProducerId]);

        pthread_cond_signal(&g_CondVar);
        pthread_mutex_unlock(&g_Mutex);
    }

    pthread_exit(NULL);
}

void* Consumer(void *pvData)
{
    while(0 == g_bExit) {
        pthread_mutex_lock(&g_Mutex);

        /*Wait until one of the Producers update it's Data*/
        while(-1 == g_lChangedProducerId) {
            pthread_cond_wait(&g_CondVar, &g_Mutex);
        }

        /*Revert the update done by the Producer*/
        --g_lDataArray[g_lChangedProducerId];

        printf("Consumer: Data[%ld] = %ld\n",
                g_lChangedProducerId, g_lDataArray[g_lChangedProducerId]);

        g_lChangedProducerId = -1; /*Reset for next update*/

        pthread_mutex_unlock(&g_Mutex);
    }

    pthread_exit(NULL);
}

void CreateProducers()
{
    long i;

    pthread_attr_t attr;
    pthread_attr_init(&attr);

    for(i = 0; i < g_lProducerCount; ++i) {
        g_lProducerIds[i] = i;
        pthread_create(&g_iThreadIds[i + 1], &attr,
                        Producer, &g_lProducerIds[i]);
    }

    pthread_attr_destroy(&attr);
}

void CreateConsumer()
{
    pthread_attr_t attr;
    pthread_attr_init(&attr);

    pthread_create(&g_iThreadIds[0], &attr, Consumer, NULL);

    pthread_attr_destroy(&attr);
}

void WaitCompletion()
{
    long i;
    for(i = 0; i < g_lProducerCount + 1; ++i) {
        pthread_join(g_iThreadIds[i], NULL);
    }
}

int main()
{
    CreateProducers();
    CreateConsumer();

    getchar();

    g_bExit = 1;

    WaitCompletion();

    return 0;
}

Ответы [ 3 ]

1 голос
/ 01 июля 2011

Вы должны были бы уточнить, чего именно вы хотите достичь. На данный момент производители увеличивают только целое число, а потребитель уменьшает значение. Это не очень полезное занятие;) Я понимаю, что это всего лишь тестовое приложение, но все еще не ясно, какова цель этой обработки, каковы ограничения и т. Д.

Производители производят несколько «предметов». Результат этого производства представлен в виде целочисленного значения. 0 означает отсутствие элементов, 1 означает, что есть ожидающий элемент, который может принять потребитель. Это правильно? Теперь, возможно ли, чтобы производитель произвел несколько элементов до того, как какой-либо из них будет израсходован (увеличивая ячейку массива до значения выше 1)? Или он должен ждать, пока последний предмет будет израсходован, прежде чем следующий будет помещен в хранилище? Хранение ограничено или неограничено? Если оно ограничено, то распространяется ли ограничение на всех производителей или определено для каждого производителя?

То, что я ожидал, было то, что, когда производитель обновляет свои данные, Потребитель немедленно обрабатывает его, то есть после каждого Производителя. сообщение должно быть напечатано сообщение потребителя.

Хотя не совсем понятно, чего вы хотите достичь, я придерживаюсь этой цитаты и предполагаю следующее: существует ограничение в 1 элемент на производителя, и производитель должен ждать, пока потребитель опустошит хранилище, прежде чем новый элемент может быть помещен в ячейку, т.е. единственные допустимые значения в массиве g_lDataArray - 0 и 1.

Чтобы обеспечить максимальный параллелизм между потоками, вам понадобится пара условных переменных / мьютексов для каждой ячейки g_lDataArray (для каждого производителя). Вам также понадобится очередь обновлений, представляющая собой список производителей, которые представили свои работы, и пара условных переменных / мьютексов для ее защиты. Это заменит g_lChangedProducerId, который может содержать только одно значение за раз.

Каждый раз, когда производитель хочет поместить элемент в хранилище, он должен получить соответствующую блокировку, проверить, не является ли хранилище пустым (g_lDataArray [lProducerId] == 0), если не ожидать переменную условия, а затем увеличить значение. ячейка, снять удерживаемую блокировку, получить блокировку потребителя, добавить его идентификатор в очередь обновлений, уведомить потребителя, снять блокировку потребителя. Конечно, если производитель выполнит какие-либо реальные вычисления, производящие некоторый реальный элемент, эта работа должна быть выполнена вне области какой-либо блокировки перед попыткой поместить элемент в хранилище.

В псевдокоде это выглядит так:

// do some computations
item = compute();

lock (mutexes[producerId]) {
    while (storage[producerId] != 0)
        wait(condVars[producerId]);
    storage[producerId] = item;
}

lock (consumerMutex) {
    queue.push(producerId);
    signal(consumerCondVar);
}        

Потребитель должен действовать следующим образом: получить свою блокировку, проверить, есть ли какие-либо ожидающие обновления для обработки, если не ожидать переменную условия, вынуть одно обновление из очереди (то есть номер производителя обновлений), получить блокировку для производителя, который будет обрабатывать обновление, уменьшить ячейку, уведомить производителя, снять блокировку производителя, снять его блокировку и, наконец, обработать обновление.

lock (consumerMutex) {
    while (queue.isEmpty())
        wait(consumerCondVar);
    producerId = queue.pop();
    lock (mutexex[producerId]) {
        item = storage[producerId];
        storage[producerId] = 0;
        signal(condVars[producerId]);
    }
}

//process the update
process(item);

Надеюсь, вам нужен этот ответ.

0 голосов
/ 02 апреля 2013

Что ж, когда ваш продюсер произвел, он может разбудить ProThread или ConThread. И если он разбудил ProThread, производитель произвел снова, и ConThread не потреблял сразу после того, как данные произведены Это то, что вы не хотите видеть. Все, что вам нужно, - это убедиться, что когда он произойдет, он не разбудит ProThread. Вот один вид решения для этого

    void* Producer(void *pvData)
    {
            ........
            //wait untill consumer consume its number
            while(-1!=g_lChangedProducerId)
                    pthread_cond_wait(&g_CondVar,&g_Mutex);
            //here to inform the consumer it produced the data
            g_lChangedProducerId = lProducerId;
            ........
    }

    void* Consumer(void *pvData)
    {
            g_lChangedProducerId = -1;
            **//wake up the producer when it consume
            pthread_cond_signal(&g_CondVar);**
            pthread_mutex_unlock(&g_Mutex);
    }
0 голосов
/ 30 июня 2011

Проблема может заключаться в том, что все производители изменяют g_lChangedProducerId, поэтому значение, записанное одним производителем, может быть перезаписано другим производителем до того, как его увидит потребитель.

Это означает, что потребитель фактически не видит, что потребительПервый производитель произвел некоторую продукцию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...