Неблокирующая одновременная коллекция? - PullRequest
7 голосов
/ 19 июля 2010

System.Collections.Concurrent имеет несколько новых коллекций, которые очень хорошо работают в многопоточных средах.Тем не менее, они немного ограничены.Либо они блокируются, пока элемент не становится доступным, либо возвращают default(T) (методы TryXXX).

Мне нужна коллекция, которая является поточно-ориентированной, но вместо блокировки вызывающей нити она использует обратный вызов, чтобы сообщить мне, что хотя бы один элемент доступен.

Мое текущее решение - использовать BlockingCollection, но использовать APM с делегатом для получения следующего элемента.Другими словами, я создаю делегата для метода, который Take s из коллекции, и выполняю этот делегат, используя BeginInvoke.

К сожалению, мне нужно сохранить много состояния в своем классе, чтобычтобы сделать это.Хуже того, класс не является потокобезопасным;он может использоваться только одним потоком.Я отклоняюсь от края удобства обслуживания, которое я предпочел бы не делать.

Я знаю, что есть некоторые библиотеки, которые делают то, что я здесь делаю, довольно простым (я считаю, что Reactive Framework - это одна изиз них), но я хотел бы достичь своих целей без добавления каких-либо ссылок за пределами версии 4. фреймворка.

Существуют ли какие-либо более совершенные шаблоны, которые я могу использовать, которые не требуют внешних ссылок, которые выполняют мою цель?


tl; др:

Существуют ли какие-либо шаблоны, которые удовлетворяют требованию:

"Мне нужно сообщить о коллекции, к которой я готовследующий элемент, и чтобы коллекция выполнила обратный вызов, когда этот следующий элемент прибыл, без блокирования каких-либо потоков. "

Ответы [ 2 ]

4 голосов
/ 19 июля 2010

Я думаю, у меня есть два возможных решения. Меня это не особо устраивает, но они, по крайней мере, предоставляют разумную альтернативу подходу APM.

Первый не соответствует вашему требованию об отсутствии блокирующего потока, но я думаю, что это довольно элегантно, потому что вы можете зарегистрировать обратные вызовы, и они будут вызываться циклически, но у вас все еще есть возможность вызывать Take или TryTake как обычно для BlockingCollection. Этот код принудительно регистрирует обратные вызовы каждый раз, когда запрашивается элемент. Это сигнальный механизм для коллекции. Хорошая особенность этого подхода заключается в том, что вызовы Take не страдают от голода, как в моем втором решении.

public class NotifyingBlockingCollection<T> : BlockingCollection<T>
{
    private Thread m_Notifier;
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>();

    public NotifyingBlockingCollection()
    {
        m_Notifier = new Thread(Notify);
        m_Notifier.IsBackground = true;
        m_Notifier.Start();
    }

    private void Notify()
    {
        while (true)
        {
            Action<T> callback = m_Callbacks.Take();
            T item = Take();
            callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
        }
    }

    public void RegisterForTake(Action<T> callback)
    {
        m_Callbacks.Add(callback);
    }
}

Второй соответствует вашему требованию не блокирующая нить. Обратите внимание, как он передает вызов обратного вызова в пул потоков. Я сделал это, потому что я думаю, что если бы он выполнялся синхронно, то блокировки были бы дольше удерживаться, что приводило к узким местам Add и RegisterForTake. Я внимательно изучил его и не думаю, что он может быть заблокирован в реальном времени (доступны как элемент, так и обратный вызов, но обратный вызов не выполняется), но вы можете проверить это самостоятельно, чтобы проверить. Единственная проблема здесь заключается в том, что вызов Take будет истощен, поскольку обратные вызовы всегда имеют приоритет.

public class NotifyingBlockingCollection<T>
{
    private BlockingCollection<T> m_Items = new BlockingCollection<T>();
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>();

    public NotifyingBlockingCollection()
    {
    }

    public void Add(T item)
    {
        lock (m_Callbacks)
        {
            if (m_Callbacks.Count > 0)
            {
                Action<T> callback = m_Callbacks.Dequeue();
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Items.Add(item);
            }
        }
    }

    public T Take()
    {
        return m_Items.Take();
    }

    public void RegisterForTake(Action<T> callback)
    {
        lock (m_Callbacks)
        {
            T item;
            if (m_Items.TryTake(out item))
            {
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Callbacks.Enqueue(callback);
            }
        }
    }
}
3 голосов
/ 20 июля 2010

Как насчет этого?(Возможно, для именования можно использовать какую-то работу. И обратите внимание, что это не проверено.)

public class CallbackCollection<T>
{
    // Sychronization object to prevent race conditions.
    private object _SyncObject = new object();

    // A queue for callbacks that are waiting for items.
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>();

    // A queue for items that are waiting for callbacks.
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>();

    public void Add(T item)
    {
        Action<T> callback;
        lock (_SyncObject)
        {
            // Try to get a callback. If no callback is available,
            // then enqueue the item to wait for the next callback
            // and return.
            if (!_Callbacks.TryDequeue(out callback))
            {
                _Items.Enqueue(item);
                return;
            }
        }

        ExecuteCallback(callback, item);
    }

    public void TakeAndCallback(Action<T> callback)
    {
        T item;
        lock(_SyncObject)
        {
            // Try to get an item. If no item is available, then
            // enqueue the callback to wait for the next item
            // and return.
            if (!_Items.TryDequeue(out item))
            {
                _Callbacks.Enqueue(callback);
                return;
            }
        }
        ExecuteCallback(callback, item);
    }

    private void ExecuteCallback(Action<T> callback, T item)
    {
        // Use a new Task to execute the callback so that we don't
        // execute it on the current thread.
        Task.Factory.StartNew(() => callback.Invoke(item));
    }
}
...