Может ли C # блокировать сообщения FIFO в очереди утечки? - PullRequest
2 голосов
/ 10 октября 2010

Я работаю над академическим проектом с открытым исходным кодом, и теперь мне нужно создать быструю блокирующую очередь FIFO в C #.Моя первая реализация просто обернула синхронизированную очередь (с динамическим расширением) в семафор читателя, затем я решила повторно реализовать следующий (теоретически более быстрый) способ

public class FastFifoQueue<T>
{
    private T[] _array;
    private int _head, _tail, _count;
    private readonly int _capacity;
    private readonly Semaphore _readSema, _writeSema;

    /// <summary>
    /// Initializes FastFifoQueue with the specified capacity
    /// </summary>
    /// <param name="size">Maximum number of elements to store</param>
    public FastFifoQueue(int size)
    {
        //Check if size is power of 2
        //Credit: /485228/kak-proverit-yavlyaetsya-li-chislo-stepeny-2
        if ((size & (size - 1)) != 0)
            throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");

        _capacity = size;
        _array = new T[size];
        _count = 0;
        _head = int.MinValue; //0 is the same!
        _tail = int.MinValue;

        _readSema = new Semaphore(0, _capacity);
        _writeSema = new Semaphore(_capacity, _capacity);
    }

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        Interlocked.Exchange(ref _array[index], item);
        Interlocked.Increment(ref _count);
        _readSema.Release();
    }

    public T Dequeue()
    {
        _readSema.WaitOne();
        int index = Interlocked.Increment(ref _tail);
        index %= _capacity;
        if (index < 0) index += _capacity;
        T ret = Interlocked.Exchange(ref _array[index], null);
        Interlocked.Decrement(ref _count);
        _writeSema.Release();

        return ret;
    }

    public int Count
    {
        get
        {
            return _count;
        }
    }
}

Это классическая реализация очереди FIFOсо статическим массивом мы находим в учебниках.Он предназначен для атомарного увеличения указателей, и поскольку я не могу заставить указатель вернуться к нулю при достижении (емкость-1), я вычисляю по модулю отдельно.Теоретически, использование Interlocked - это то же самое, что и блокировка перед выполнением инкремента, и, поскольку существуют семафоры, несколько производителей / потребителей могут войти в очередь, но только один за раз может изменять указатели очереди.Во-первых, поскольку Interlocked.Increment сначала увеличивает, а затем возвращает, я уже понимаю, что я ограничен в использовании значения после увеличения и начинаю хранить элементы с позиции 1 в массиве.Это не проблема, я вернусь к 0, когда достигну определенного значения

В чем проблема?Вы не поверите, что при большой нагрузке очередь иногда возвращает значение NULL.Я уверен, повторяю, я уверен, что ни один метод не помещает null в очередь.Это определенно верно, потому что я попытался поставить нулевую проверку в Enqueue, чтобы убедиться, и не было выдано никакой ошибки.Я создал тестовый пример для этого с помощью Visual Studio (кстати, я использую двухъядерный процессор, такой как maaaaaaaany people)

    private int _errors;

    [TestMethod()]
    public void ConcurrencyTest()
    {
        const int size = 3; //Perform more tests changing it
        _errors = 0;
        IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
        Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
        Thread[] producers = new Thread[size], consumers = new Thread[size];

        for (int i = 0; i < size; i++)
        {
            producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
            consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
            producers[i].Start(queue);
            consumers[i].Start(queue);
        }

        Thread.Sleep(new TimeSpan(0, 0, 1, 0));

        for (int i = 0; i < size; i++)
        {
            producers[i].Abort();
            consumers[i].Abort();
        }

        Assert.AreEqual(0, _errors);
    }

    private void LoopProducer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                try
                {
                    q.Enqueue(new object());
                }
                catch
                { }

            }
        }
        catch (ThreadAbortException)
        { }
    }

    private void LoopConsumer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                object item = q.Dequeue();
                if (item == null) Interlocked.Increment(ref _errors);
            }
        }
        catch (ThreadAbortException)
        { }

    }

Когда нить потребителя получает ноль, подсчитывается ошибка.При выполнении теста с 1 производителем и 1 потребителем он проходит успешно.При выполнении теста с двумя производителями и двумя или более потребителями происходит авария: обнаруживается даже 2000 утечек.Я обнаружил, что проблема может быть в методе Enqueue.Согласно контракту на разработку, производитель может писать только в пустую ячейку ( null ), но, модифицируя мой код с помощью некоторой диагностики, я обнаружил, что иногда производитель пытается писать в непустую ячейку, чтозатем занят «хорошими» данными.

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        T leak = Interlocked.Exchange(ref _array[index], item);

        //Diagnostic code
        if (leak != null)
        {
            throw new InvalidOperationException("Too bad...");
        }
        Interlocked.Increment(ref _count);

        _readSema.Release();
    }

«Слишком плохие» исключения случаются часто.Но слишком странно, что конфликт возникает из-за одновременной записи, потому что приращения являются атомарными, а семафор автора позволяет только столько авторов, сколько ячеек свободного массива.

Может кто-нибудь помочь мне с этим?Буду очень признателен, если вы поделитесь со мной своими навыками и опытом.

Спасибо.

Ответы [ 3 ]

6 голосов
/ 10 октября 2010

Должен сказать, это поразило меня как очень умная идея, и я думал об этом пока , прежде чем начал понимать, где (я думаю ) ошибка здесь. Так что, с одной стороны, спасибо за такой умный дизайн! Но, в то же время, позор вам за демонстрацию " закона Кернигана ":

Отладка в два раза сложнее, чем писать код в первую очередь. Поэтому, если вы пишете код настолько умно, насколько это возможно, вы, по определению, недостаточно умны для его отладки.

В основном проблема заключается в следующем: вы предполагаете, что WaitOne и Release эффективно вызывают сериализацию всех ваших операций Enqueue и Dequeue; но это не совсем то, что здесь происходит. Помните, что класс Semaphore используется для ограничения количества потоков, обращающихся к ресурсу , , а не для обеспечения определенного порядка событий. То, что происходит между каждый WaitOne и Release, не обязательно происходит в том же «порядке потоков», что и сами WaitOne и Release.

Это сложно объяснить словами, поэтому позвольте мне привести наглядную иллюстрацию.

Допустим, ваша очередь имеет емкость 8 и выглядит следующим образом (пусть 0 представляет null и x представляют объект):

[ x x x x x x x x ]

Итак, Enqueue был вызван 8 раз, и очередь заполнена. Поэтому ваш _writeSema семафор заблокируется на WaitOne, а ваш _readSema семафор немедленно вернется на WaitOne.

Теперь давайте предположим, что Dequeue вызывается более или менее одновременно в 3 разных потоках. Давайте назовем их T1, T2 и T3.

Прежде чем продолжить, позвольте мне применить некоторые метки к вашей реализации Dequeue, для справки:

public T Dequeue()
{
    _readSema.WaitOne();                                   // A
    int index = Interlocked.Increment(ref _tail);          // B
    index %= _capacity;
    if (index < 0) index += _capacity;
    T ret = Interlocked.Exchange(ref _array[index], null); // C
    Interlocked.Decrement(ref _count);
    _writeSema.Release();                                  // D

    return ret;
}

Хорошо, значит, T1, T2 и T3 все прошли точку A . Затем для простоты предположим, что каждая из них достигает линии B «по порядку», так что у T1 index равно 0, у T2 index равно 1, а у T3 index равно 2 .

Пока все хорошо. Но вот что надо: нет никакой гарантии, что отсюда T1, T2 и T3 попадут в линию D в любом указанном порядке . Предположим, что T3 на самом деле получает впереди T1 и T2, перемещаясь за линию C (и таким образом устанавливая _array[2] в null) и вплоть до линии D .

После этого будет сигнализироваться _writeSema, что означает, что в вашей очереди есть один слот для записи, верно? Но ваша очередь теперь выглядит так!

[ x x 0 x x x x x ]

Таким образом, если в то же время появился другой поток с вызовом Enqueue, он на самом деле получит мимо _writeSema.WaitOne, увеличит _head и получит index из 0, хотя слот 0 не пуст . Результатом этого будет то, что элемент в слоте 0 может быть на самом деле перезаписан до того, как T1 (помните его?) Прочитает его.

Чтобы понять, откуда берутся ваши значения null, вам нужно лишь визуализировать обратную сторону процесса, который я только что описал. То есть предположим, что ваша очередь выглядит так:

[ 0 0 0 0 0 0 0 0 ]

Три потока, T1, T2 и T3, все вызывают Enqueue почти одновременно. T3 увеличивает _head last , но вставляет свой элемент (в _array[2]) и вызывает _readSema.Release first , в результате чего сигнализируется _readSema, но очередь выглядит как:

[ 0 0 x 0 0 0 0 0 ]

Так что, если в это же время появился другой поток с вызовом Dequeue (до того, как T1 и T2 закончат свою работу), он пройдет _readSema.WaitOne, увеличит _tail и получит index из 0, , хотя слот 0 является пустым .

Итак, вот ваша проблема . Что касается решения , у меня сейчас нет никаких предложений. Дайте мне немного времени подумать ... (Я отправляю этот ответ сейчас, потому что он свеж в моей памяти, и я чувствую, что он может вам помочь.)

3 голосов
/ 10 октября 2010

(+ 1 к Дэну Тао, за которого я голосую, есть ответ). Эту очередь можно изменить на что-то вроде этого ...

while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
    ;

Эту очередь можно изменить на что-то вроде этого ...

while( (ret = Interlocked.Exchange(ref _array[index], null)) == null)
    ;

Это основано на превосходном анализе Дана Тао.Поскольку индексы получаются атомарно, то (при условии, что в методах enqueue или dequeue нет потоков, которые умирают или заканчиваются), читателю гарантированно в конечном итоге будет заполнена его ячейка, или писателю гарантированно освободится его ячейка (ноль).

2 голосов
/ 13 октября 2010

Спасибо, Дэн Тао и Лес,

Я очень ценю вашу помощь.Дэн, ты открыл мой разум: не важно, сколько производителей / потребителей находится внутри критической секции, важно, чтобы блокировки были разблокированы в порядке .Лес, ты нашел решение проблемы.

Теперь пришло время, наконец, ответить на мой собственный вопрос с помощью окончательного кода, который я сделал благодаря помощи вас обоих.Ну, это немного, но это небольшое улучшение из кода Леса

Постановка в очередь:

while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
            Thread.Sleep(0);

Задание в очереди:

while ((ret = Interlocked.Exchange(ref _array[index], null)) == null)
            Thread.Sleep(0);

Почему Thread.Sleep (0)?Когда мы обнаруживаем, что элемент не может быть извлечен / сохранен, зачем сразу проверять снова?Мне нужно принудительное переключение контекста , чтобы другие потоки могли читать / писать.Очевидно, что следующий поток, который будет запланирован , может быть другим потоком, не способным работать, но, по крайней мере, мы его форсируем.Источник: http://progfeatures.blogspot.com/2009/05/how-to-force-thread-to-perform-context.html

Я также протестировал код предыдущего контрольного примера, чтобы получить доказательства своих утверждений:

без сна (0)

Read 6164150 elements
Wrote 6322541 elements
Read 5885192 elements
Wrote 5785144 elements
Wrote 6439924 elements
Read 6497471 elements

со сном(0)

Wrote 7135907 elements
Read 6361996 elements
Wrote 6761158 elements
Read 6203202 elements
Wrote 5257581 elements
Read 6587568 elements

Я знаю, что это не "великое" открытие, и я не получу приз Тьюринга за эти номера.Увеличение производительности не драматично, но больше нуля.Принудительное переключение контекста позволяет выполнять больше операций RW (= более высокая пропускная способность).

Для ясности : в моем тесте я просто оцениваю производительность очереди, а не моделирую производителя /потребительская проблема, поэтому не волнует, если в конце теста через минуту все еще остаются элементы в очереди.Но я только что продемонстрировал, что мой подход работает, спасибо вам всем.

Код, доступный с открытым исходным кодом как MS-RL: http://logbus -ng.svn.sourceforge.net / viewvc / logbus-ng / trunk/logbus-core/It.Unina.Dis.Logbus/Utils/FastFifoQueue.cs?revision=461&view=markup

...