Является ли этот поток без очереди .NET безопасным? - PullRequest
6 голосов
/ 09 октября 2009

Мой вопрос таков: является ли класс, включенный ниже, для класса очереди с одним читателем с одним модулем записи безопасным для потоков? Этот тип очереди называется свободным от блокировки, даже если он будет блокироваться, если очередь заполнена. Структура данных была вдохновлена ​​ Реализацией Марка Гравелла очереди блокировки здесь, в StackOverflow.

Смысл структуры - позволить одному потоку записывать данные в буфер, а другому потоку - читать данные. Все это должно произойти как можно быстрее.

Подобная структура данных описана в статье в DDJ Хербом Саттером , за исключением того, что реализация реализована на C ++. Другое отличие состоит в том, что я использую связанный список ванили, я использую связанный список массивов.

Вместо того, чтобы просто включать фрагмент кода, я включаю все это вместе с комментарием с разрешительной лицензией с открытым исходным кодом (MIT License 1.0) на тот случай, если кто-то сочтет это полезным и захочет использовать его (как есть или изменен).

Это связано с другими вопросами, задаваемыми при переполнении стека, о том, как создавать блокирующие параллельные очереди (см. Создание очереди blockinq в .NET и Реализация потоковобезопасной очереди блокировки в .NET ).

Вот код:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

namespace CollectionSandbox
{
    /// This is a single reader / singler writer buffered queue implemented
    /// with (almost) no locks. This implementation will block only if filled 
    /// up. The implementation is a linked-list of arrays.
    /// It was inspired by the desire to create a non-blocking version 
    /// of the blocking queue implementation in C# by Marc Gravell
    /// /396882/sozdanie-ocheredi-blokirovki-t-v-net#396891
    class SimpleSharedQueue<T> : IStreamBuffer<T>
    {
        /// Used to signal things are no longer full
        ManualResetEvent canWrite = new ManualResetEvent(true);

        /// This is the size of a buffer 
        const int BUFFER_SIZE = 512;

        /// This is the maximum number of nodes. 
        const int MAX_NODE_COUNT = 100;

        /// This marks the location to write new data to.
        Cursor adder;

        /// This marks the location to read new data from.
        Cursor remover;

        /// Indicates that no more data is going to be written to the node.
        public bool completed = false;

        /// A node is an array of data items, a pointer to the next item,
        /// and in index of the number of occupied items 
        class Node
        {
            /// Where the data is stored.
            public T[] data = new T[BUFFER_SIZE];

            /// The number of data items currently stored in the node.
            public Node next;

            /// The number of data items currently stored in the node.
            public int count;

            /// Default constructor, only used for first node.
            public Node()
            {
                count = 0;
            }

            /// Only ever called by the writer to add new Nodes to the scene
            public Node(T x, Node prev)
            {
                data[0] = x;
                count = 1;

                // The previous node has to be safely updated to point to this node.
                // A reader could looking at the point, while we set it, so this should be 
                // atomic.
                Interlocked.Exchange(ref prev.next, this);
            }
        }

        /// This is used to point to a location within a single node, and can perform 
        /// reads or writers. One cursor will only ever read, and another cursor will only
        /// ever write.
        class Cursor
        {
            /// Points to the parent Queue
            public SimpleSharedQueue<T> q;

            /// The current node
            public Node node;

            /// For a writer, this points to the position that the next item will be written to.
            /// For a reader, this points to the position that the next item will be read from.
            public int current = 0;

            /// Creates a new cursor, pointing to the node
            public Cursor(SimpleSharedQueue<T> q, Node node)
            {
                this.q = q;
                this.node = node;
            }

            /// Used to push more data onto the queue
            public void Write(T x)
            {
                Trace.Assert(current == node.count);

                // Check whether we are at the node limit, and are going to need to allocate a new buffer.
                if (current == BUFFER_SIZE)
                {
                    // Check if the queue is full
                    if (q.IsFull())
                    {
                        // Signal the canWrite event to false
                        q.canWrite.Reset();

                        // Wait until the canWrite event is signaled 
                        q.canWrite.WaitOne();
                    }

                    // create a new node
                    node = new Node(x, node);
                    current = 1;
                }
                else
                {
                    // If the implementation is correct then the reader will never try to access this 
                    // array location while we set it. This is because of the invariant that 
                    // if reader and writer are at the same node: 
                    //    reader.current < node.count 
                    // and 
                    //    writer.current = node.count 
                    node.data[current++] = x;

                    // We have to use interlocked, to assure that we incremeent the count 
                    // atomicalluy, because the reader could be reading it.
                    Interlocked.Increment(ref node.count);
                }
            }

            /// Pulls data from the queue, returns false only if 
            /// there 
            public bool Read(ref T x)
            {
                while (true)
                {
                    if (current < node.count)
                    {
                        x = node.data[current++];
                        return true;
                    }
                    else if ((current == BUFFER_SIZE) && (node.next != null))
                    {
                        // Move the current node to the next one.
                        // We know it is safe to do so.
                        // The old node will have no more references to it it 
                        // and will be deleted by the garbage collector.
                        node = node.next;

                        // If there is a writer thread waiting on the Queue,
                        // then release it.
                        // Conceptually there is a "if (q.IsFull)", but we can't place it 
                        // because that would lead to a Race condition.
                        q.canWrite.Set();

                        // point to the first spot                
                        current = 0;

                        // One of the invariants is that every node created after the first,
                        // will have at least one item. So the following call is safe
                        x = node.data[current++];
                        return true;
                    }

                    // If we get here, we have read the most recently added data.
                    // We then check to see if the writer has finished producing data.
                    if (q.completed)
                        return false;

                    // If we get here there is no data waiting, and no flagging of the completed thread.
                    // Wait a millisecond. The system will also context switch. 
                    // This will allow the writing thread some additional resources to pump out 
                    // more data (especially if it iself is multithreaded)
                    Thread.Sleep(1);
                }
            }
        }

        /// Returns the number of nodes currently used.
        private int NodeCount
        {
            get
            {
                int result = 0;
                Node cur = null;
                Interlocked.Exchange<Node>(ref cur, remover.node);

                // Counts all nodes from the remover to the adder
                // Not efficient, but this is not called often. 
                while (cur != null)
                {
                    ++result;
                    Interlocked.Exchange<Node>(ref cur, cur.next);
                }
                return result;
            }
        }

        /// Construct the queue.
        public SimpleSharedQueue()
        {
            Node root = new Node();
            adder = new Cursor(this, root);
            remover = new Cursor(this, root);
        }

        /// Indicate to the reader that no more data is going to be written.
        public void MarkCompleted()
        {
            completed = true;
        }

        /// Read the next piece of data. Returns false if there is no more data. 
        public bool Read(ref T x)
        {
            return remover.Read(ref x);
        }

        /// Writes more data.
        public void Write(T x)
        {
            adder.Write(x);
        }

        /// Tells us if there are too many nodes, and can't add anymore.
        private bool IsFull()
        {
            return NodeCount == MAX_NODE_COUNT;  
        }
    }
}

Ответы [ 6 ]

7 голосов
/ 09 октября 2009

Microsoft Research CHESS должен оказаться хорошим инструментом для тестирования вашей реализации.

4 голосов
/ 01 ноября 2009

Наличие Sleep() делает подход без блокировки абсолютно бесполезным. Единственная причина противостоять сложностям конструкции без блокировок - это необходимость абсолютной скорости и избежания затрат на семафоры. Использование Sleep (1) полностью уничтожает эту цель.

3 голосов
/ 09 октября 2009

Учитывая, что я не могу найти никаких ссылок на то, что Interlocked.Exchange выполняет блоки чтения или записи, я бы сказал, что нет. Я также хотел бы спросить, почему вы хотите остаться без блокировки, так как редко дает достаточно преимуществ, чтобы противостоять его сложности.

Microsoft провела отличную презентацию на GDC 2009, и вы можете получить слайды здесь .

2 голосов
/ 14 октября 2009

Остерегайтесь шаблона двойной проверки - одиночный замок (как в приведенной выше ссылке: http://www.yoda.arachsys.com/csharp/singleton.html)

Цитировать дословно из «Современного C ++ Design» Андрея Александреску

1 голос
/ 09 октября 2009

Во-первых, меня интересует предположение в этих двух строках последовательного кода:

                node.data[current++] = x;

                // We have to use interlocked, to assure that we incremeent the count 
                // atomicalluy, because the reader could be reading it.
                Interlocked.Increment(ref node.count);

Что сказать, что новое значение node.data [] зафиксировано в этой ячейке памяти? Он не хранится в энергозависимом адресе памяти и поэтому может быть кэширован, если я правильно понимаю? Не приводит ли это к «грязному» чтению? Могут быть и другие места, то же самое верно, но это сразу бросилось в глаза.

Второй многопоточный код, содержащий следующее:

Thread.Sleep(int);

... никогда не бывает хорошим знаком. Если это требуется, то код предназначен для сбоя, если это не требуется, это пустая трата. Мне бы очень хотелось, чтобы они полностью удалили этот API. Поймите, что это просьба подождать хотя бы такое количество времени. С переключением контекста вы почти наверняка будете ждать дольше, намного дольше.

В-третьих, я совершенно не понимаю, как здесь используется интерфейс блокировки. Может быть, я устал и просто упускаю суть; но я не могу найти потенциальный конфликт потоков в обоих потоках, читающих и пишущих в одну и ту же переменную? Казалось бы, единственное использование, которое я мог найти для обмена блокировками, - это изменить содержимое файла node.data [], чтобы исправить # 1 выше.

Наконец, может показаться, что реализация несколько усложнена. Я упускаю суть всего, что связано с курсором / узлом, или он делает то же самое, что и этот класс? (Примечание: я не пробовал и не думаю, что это потокобезопасно, просто пытаюсь свести на нет то, что я думаю, что вы делаете.)

class ReaderWriterQueue<T>
{
    readonly AutoResetEvent _readComplete;
    readonly T[] _buffer;
    readonly int _maxBuffer;
    int _readerPos, _writerPos;

    public ReaderWriterQueue(int maxBuffer)
    {
        _readComplete = new AutoResetEvent(true);
        _maxBuffer = maxBuffer;
        _buffer = new T[_maxBuffer];
        _readerPos = _writerPos = 0;
    }

    public int Next(int current) { return ++current == _maxBuffer ? 0 : current; }

    public bool Read(ref T item)
    {
        if (_readerPos != _writerPos)
        {
            item = _buffer[_readerPos];
            _readerPos = Next(_readerPos);
            return true;
        }
        else
            return false;
    }

    public void Write(T item)
    {
        int next = Next(_writerPos);

        while (next == _readerPos)
            _readComplete.WaitOne();

        _buffer[next] = item;
        _writerPos = next;
    }
}

Значит, я здесь совершенно вне базы и не вижу магию в исходном классе?

Я должен признать одну вещь, я презираю Threading. Я видел, как лучшие разработчики терпят неудачу в этом. В этой статье приведен отличный пример того, как трудно правильно настроить многопоточность: http://www.yoda.arachsys.com/csharp/singleton.html

1 голос
/ 09 октября 2009

Я подозреваю, что это не потокобезопасно - представьте следующий сценарий:

две темы введите cursor.Write. Первый доходит до строки node = new Node(x, node); в истинной половине оператора if (current == BUFFER_SIZE) (но давайте также предположим, что current == BUFFER_SIZE), поэтому, когда 1 добавляется к current, тогда другой входящий поток будет следовать по другому пути через оператор if. Теперь представьте, что поток 1 теряет свой временной интервал, а поток 2 получает его и переходит к вводу оператора if, ошибочно полагая, что условие все еще выполняется. Он должен был войти в другой путь.

Я также не запускал этот код, так что я не уверен, возможны ли мои предположения в этом коде, но если они есть (т.е. ввод курсора. Пишут из нескольких потоков при current == BUFFER_SIZE), тогда это вполне может быть склонным к ошибкам параллелизма.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...