Потокобезопасная очередь не является поточно-ориентированной, если peek возвращает ссылку, даже если ссылка никогда не используется (со сборкой!) - PullRequest
1 голос
/ 23 февраля 2010

У меня очень простая реализация очереди, которая оборачивает фиксированный массив. Он содержит взгляд, очередь и dequeue. Если peek возвращает ссылку, я обнаружил, что в конечном итоге он будет возвращать противоречивые результаты (противоречивые результаты означают, что он вернет 2 разных значения без каких-либо промежуточных очередей или очереди). Очевидно, это может произойти, если эта ссылка будет сохранена и изменена, но, насколько я могу судить, это не так. Фактически, повторный вызов peek дает ожидаемый результат.

Ниже приведен код с многопоточностью Windows и мьютексами. Я также попробовал это с помощью pthreads в Linux с тем же результатом. Я, очевидно, что-то не понимаю ... Я сбросил исполняемый файл и нашел единственную разницу между возвратом ссылки или значения, когда разыменовывается память. Например:

Если ссылка возвращается, peek содержит:
lea eax,[edx+ecx*4+8]
А потом в потребительской ветке:
cmp dword ptr [eax],1

Но, если возвращается значение, peek содержит:
mov eax,dword ptr [edx+ecx*4+8]
А потом в потребительской ветке:
cmp eax,1

Спасибо!

#include <iostream>
#include <windows.h>

typedef void *(thread_func_type)(void *);

void start_thread(HANDLE &thread, thread_func_type *thread_func, void *arg)
{
    DWORD id;
    thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)thread_func, arg, 0, &id);
    if (thread == NULL) {
        std::cerr << "ERROR: failed to create thread\n";
        ::exit(1);
    }
}

void join_thread(HANDLE &thread)
{
    WaitForSingleObject(thread, INFINITE);
}

class ScopedMutex
{
    HANDLE &mutex;

public:

    ScopedMutex(HANDLE &mutex_) : mutex(mutex_)
    {
        WORD result = WaitForSingleObject(mutex, INFINITE);
        if (result != WAIT_OBJECT_0) {
            std::cerr << "ERROR: failed to lock mutex\n";
            ::exit(1);
        }
    };

    ~ScopedMutex()
    {
        ReleaseMutex(mutex);
    };
};

template <typename T, unsigned depth>
class Queue
{
    unsigned head, tail;
    bool full;
    T data[depth];
    HANDLE mutex;

public:

    Queue() : head(0), tail(0), full(false)
    {
        mutex = CreateMutex(NULL, 0, NULL);
        if (mutex == NULL) {
            std::cerr << "ERROR: could not create mutex.\n";
            ::exit(1);
        }
    };

    T &peek()
    {
        while (true) {
            {
                ScopedMutex local_lock(mutex);
                if (full || (head != tail))
                    return data[tail];
            }
            Sleep(0);
        }
    };

    void enqueue(const T &t)
    {
        while (true) {
            {
                ScopedMutex local_lock(mutex);
                if (!full) {
                    data[head++] = t;
                    head %= depth;
                    full = (head == tail);
                    return;
                }
            }
            Sleep(0);
        }
    };

    void dequeue()
    {
        while (true) {
            {
                ScopedMutex local_lock(mutex);
                if (full || (head != tail)) {
                    ++tail;
                    tail %= depth;
                    full = false;
                    return;
                }
            }
            Sleep(0);
        }
    };
};

template <unsigned num_vals, int val, unsigned depth>
void *
producer(void *arg)
{
    Queue<int, depth> &queue = *static_cast<Queue<int, depth> *>(arg);
    for (unsigned i = 0; i < num_vals; ++i) {
        queue.enqueue(val);
    }
    std::cerr << "producer " << val << " exiting.\n";
    return NULL;
}

template <unsigned num_vals, int val, unsigned depth>
void *
consumer(void *arg)
{
    Queue<int, depth> &queue = *static_cast<Queue<int, depth> *>(arg);
    for (unsigned i = 0; i < num_vals; ++i) {
        while (queue.peek() != val)
            Sleep(0);
        if (queue.peek() != val) {
            std::cerr << "ERROR: (" << val << ", " << queue.peek() << ")" << std::endl;
            std::cerr << "But peeking again gives the right value " << queue.peek() << std::endl;
            ::exit(1);
        }
        queue.dequeue();
    }
    return NULL;
}

int
main(int argc, char *argv[])
{
    const unsigned depth = 10;
    const unsigned num_vals = 100000;
    Queue<int, depth> queue;
    HANDLE p1, p2, c1, c2;
    start_thread(p1, producer<num_vals, 1, depth>, &queue);
    start_thread(p2, producer<num_vals, 2, depth>, &queue);
    start_thread(c1, consumer<num_vals, 1, depth>, &queue);
    start_thread(c2, consumer<num_vals, 2, depth>, &queue);
    join_thread(p1);
    join_thread(p2);
    join_thread(c1);
    join_thread(c2);
}

Ответы [ 3 ]

3 голосов
/ 23 февраля 2010

Peek возвращает ссылку в середину массива, в то время как другие потоки активно модифицируют эту память. Чтение любого свойства из этой ссылки будет неопределенным поведением. Вы не можете заглянуть внутрь, ваша очередь должна удалить элемент и вернуть копию.

2 голосов
/ 23 февраля 2010

Может быть, это так:

Очередь полностью заполнена. Из-за какого-то необычного планирования Producer # 2 выполнялся дважды подряд, поэтому следующие два слота в очереди содержат следующие значения: 2, 2

  1. Поток # 1 потребителя находится в своем цикле вращения и только что вызвал peek(). Возвращает ссылку на первый слот. Но за время между инструкцией lea и инструкцией cmp:
  2. Потребительская нить # 2 вызывает dequeue(). Это освобождает слот, который только что вернул Потребитель №1 peek(), и позволяет потоку продюсера продолжить. Второе значение 2 теперь находится в начале очереди.
  3. Производственный поток # 1 теперь перезаписывает первый слот значением 1. Поскольку очередь круговая, этот первый слот теперь является хвостом очереди.

    Эти два слота теперь содержат эти значения: 1, 2

  4. Возвращаясь в потребительский поток № 1, происходит инструкция cmp, вы видите нужное значение и выходите из цикла while

  5. Потребитель # 1 снова вызывает peek() и видит неправильное значение.

Когда peek() вернул копию, вы не видели этого состояния гонки, потому что Потребитель № 1 удерживает мьютекс, когда он получает значение. Когда peek() вернул ссылку, вы извлекаете значение без удержания Mutex и, следовательно, находитесь во власти планировщика команд ЦП и планировщика потоков ОС.

0 голосов
/ 23 февраля 2010

Ваши темы могут по очереди, циклически, что заставит ваших потребителей потреблять правильное число каждый раз.

Вместо этого попросите одного потребителя взять ссылку на число и сравнить его с копией, например:

int& valRef(queue.peek());
int valCopy = valRef;
while( valRef == valCopy){}
printf("Aha! It IS unsafe!\n");

В конце концов, один из производителей перезапишет память, на которую вы ссылаетесь, пока вы проводите сравнение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...