Похоже ли это на разумный подход к одновременной комбинации набор / очередь? - PullRequest
4 голосов
/ 29 сентября 2010

Обновление : Как отметил Брайан, у моей первоначальной идеи действительно была проблема параллелизма. Это было несколько затенено сигнатурой метода ConcurrentDictionary<TKey, TValue>.AddOrUpdate, который может заставить ленивого мыслителя (такого как я) поверить в то, что все - добавление набора, а также нажатие очереди - каким-то образом произойдет одновременно, атомарно ( т. е. магически).

Оглядываясь назад, с моей стороны было глупо иметь такое ожидание. Фактически, независимо от реализации AddOrUpdate, должно быть ясно, что условие гонки все еще будет существовать в моей первоначальной идее, как указывал Брайан: продвижение в очередь произойдет перед добавлением в набор, следовательно, следующая последовательность события могут произойти:

  1. Элемент помещен в очередь
  2. Элемент выскочил из очереди
  3. Предмет (не) удален из набора
  4. Элемент добавлен в набор

Приведенная выше последовательность приведет к тому, что в наборе окажется элемент, которого нет в очереди, что фактически внесет этот черный список в структуру данных.

Теперь я немного подумал об этом и начинаю думать, что следующий подход может решить эти проблемы:

public bool Enqueue(T item)
{
    // This should:
    // 1. return true only when the item is first added to the set
    // 2. subsequently return false as long as the item is in the set;
    //    and it will not be removed until after it's popped
    if (_set.TryAdd(item, true))
    {
        _queue.Enqueue(item);
        return true;
    }

    return false;
}

Структурируя это таким образом, вызов Enqueue происходит только один раз - после предмета в наборе. Поэтому дублирующиеся элементы в очереди не должны быть проблемой. И кажется, что, поскольку операции очереди «добавляются» операциями набора - то есть элемент помещается только после , он добавляется в набор и извлекается до , он удаляется. из набора - проблематичная последовательность событий, описанная выше, не должна происходить.

Что думают люди? Может быть, это решит проблему? (Как и Брайан, я склонен сомневаться в себе и думаю, что ответ Нет , и я что-то упускаю еще раз. Но эй, это было бы не весело, если бы это было легко, правда? )


Я определенно видел подобные вопросы, поставленные здесь на SO, но удивительно (учитывая, насколько тяжелым является этот сайт .NET), они все, казалось, были для Java.

Мне нужен по существу комбинированный класс set / queue, который является поточно-ориентированным. Другими словами, это должна быть коллекция FIFO, которая не допускает дублирования (поэтому, если тот же элемент уже находится в очереди, последующие вызовы Enqueue будут возвращать false, пока элемент не будет извлечен из очереди).

Я понимаю, что мог бы реализовать это довольно просто с помощью простого HashSet<T> и Queue<T> с блокировкой во всех необходимых местах. Тем не менее, я был заинтересован в выполнении этого с классами ConcurrentDictionary<TKey, TValue> и ConcurrentQueue<T> из .NET 4.0 (также доступны как часть расширений Rx для .NET 3.5, что Я использую), который, как я понимаю, представляет собой коллекцию без блокировок *.

Мой основной план состоял в том, чтобы реализовать эту коллекцию примерно так:

class ConcurrentSetQueue<T>
{
    ConcurrentQueue<T> _queue;
    ConcurrentDictionary<T, bool> _set;

    public ConcurrentSetQueue(IEqualityComparer<T> comparer)
    {
        _queue = new ConcurrentQueue<T>();
        _set = new ConcurrentDictionary<T, bool>(comparer);
    }

    public bool Enqueue(T item)
    {
        // This should:
        // 1. if the key is not present, enqueue the item and return true
        // 2. if the key is already present, do nothing and return false
        return _set.AddOrUpdate(item, EnqueueFirst, EnqueueSecond);
    }

    private bool EnqueueFirst(T item)
    {
        _queue.Enqueue(item);
        return true;
    }

    private bool EnqueueSecond(T item, bool dummyFlag)
    {
        return false;
    }

    public bool TryDequeue(out T item)
    {
        if (_queue.TryDequeue(out item))
        {
            // Another thread could come along here, attempt to enqueue, and
            // fail; however, this seems like an acceptable scenario since the
            // item shouldn't really be considered "popped" until it's been
            // removed from both the queue and the dictionary.
            bool flag;
            _set.TryRemove(item, out flag);

            return true;
        }

        return false;
    }
}

Я правильно обдумал это? На первый взгляд, я не вижу никаких очевидных ошибок в этой основной идее, как я написал выше. Но, может быть, я что-то упускаю. Или, может быть, использование ConcurrentQueue<T> с ConcurrentDictionary<T, bool> на самом деле не является разумным выбором по причинам, которые мне не приходили в голову. Или, может быть, кто-то уже реализовал эту идею в проверенной битве библиотеке где-то, и я должен просто использовать это.

Будем весьма благодарны за любые мысли или полезную информацию по этому вопросу!

* Является ли это строго точным, я не знаю; но тесты производительности показали мне, что они превосходят сопоставимые свернутые вручную коллекции, которые используют блокировку со многими потребительскими потоками.

Ответы [ 2 ]

5 голосов
/ 30 сентября 2010

Если коротко, нет, код, представленный в вопросе, не является потокобезопасным.

Документация MSDN довольно редкая для метода AddOrUpdate, поэтому я взглянул на метод AddOrUpdate вОтражатель.Вот основной алгоритм (я не публикую выходные данные Reflector по юридическим причинам, и это достаточно просто сделать самостоятельно).

TValue value;
do
{
  if (!TryGetValue(...))
  {
    value = AddValueFactoryDelegate(key);
    if (!TryAddInternal(...))
    {
      continue;
    }
    return value;
  }
  value = UpdateValueFactoryDelegate(key);
} 
while (!TryUpdate(...))
return value;

Так что AddValueFactoryDelegate и UpdateValueFactoryDelegate могут выполняться несколько раз.Здесь нет необходимости для дальнейшего объяснения.Должно быть совершенно очевидно, как это сломает ваш код.Я на самом деле немного шокирован тем, что делегаты могут быть выполнены несколько раз.Документация не упоминает об этом.Вы могли бы подумать, что это было бы невероятно важным моментом, чтобы вызывающие абоненты знали, что следует избегать прохождения делегатов с побочными эффектами (как в вашем случае).

Но даже если бы делегаты гарантировали выполнение только один раз, все равно будетпроблема.Должно быть легко визуализировать последовательность проблем, заменив метод Enqueue содержимым метода AddOrUpdate.AddValueFactoryDelegate может выполнить и вставить элемент в _queue, но поток может быть прерван переключением контекста перед добавлением элемента в _set.Затем второй поток может вызвать ваш метод TryDequeue и извлечь этот элемент из _queue, но не удастся удалить его из _set, поскольку он еще не существует.

Обновление:

Хорошо, я не думаю, что можно заставить его работать.ConcurrentQueue отсутствует одна важная операция.Я полагаю, вам нужен CAS эквивалент для TryDequeue метода.Если бы такая операция существовала, то я думаю следующий код был бы правильным.Я использую мифический метод TryDequeueCas, который принимает значение сравнения, которое используется в качестве условия, чтобы сказать, что выполнять эту операцию атомарно, если и только если верхний элемент в очереди равен значению сравнения.Идея точно такая же, как в методе Interlocked.CompareExchange.

Обратите внимание, что код использует значение bool в ConcurrentDictionary в качестве «виртуальной» блокировки для синхронизации координации очереди итолковый словарь.Структура данных также содержит эквивалентную операцию CAS TryUpdate, которая используется для получения и снятия этой «виртуальной» блокировки.А поскольку блокировка является «виртуальной» и фактически не блокирует параллельный доступ, цикл while в методе TryDequeue является обязательным.Это соответствует каноническому шаблону операций CAS в том смысле, что они обычно выполняются в цикле до тех пор, пока не будут выполнены успешно.

В коде также используется шаблон "try-finally" в стиле .NET 4.0 для получения идентификатора блокировки, чтобы помочь избежать проблем, вызванных внеполосными (асинхронными) исключениями.

Примечание. Код снова использует мифический метод ConcurrentQueue.TryDequeueCas.

class ConcurrentSetQueue<T>
{
    ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    ConcurrentDictionary<T, bool> _set = new ConcurrentDictionary<T, bool>();

    public ConcurrentSetQueue()
    {
    }

    public bool Enqueue(T item)
    {
        bool acquired = false;
        try
        {
            acquired = _set.TryAdd(item, true);
            if (acquired)
            {
                _queue.Enqueue(item);
                return true;
            }
            return false;
        }
        finally
        {
            if (acquired) _set.TryUpdate(item, false, true);
        }
    }

    public bool TryDequeue(out T item)
    {
        while (_queue.TryPeek(out item))
        {
            bool acquired = false;
            try
            {
                acquired = _set.TryUpdate(item, true, false);
                if (acquired)
                {
                    if (_queue.TryDequeueCas(out item, item))
                    {
                        return true;
                    }
                }
            }
            finally
            {
                if (acquired) _set.TryRemove(item, out acquired);
            }
        }
        item = default(T);
        return false;
    }
}

Обновление 2:

В отношенииВаша модификация заметила, насколько она похожа на мою.Фактически, если вы удалите весь пух из моего варианта, метод Enqueue будет иметь точную такую ​​же последовательность утверждений.

Я больше беспокоился о TryDequeue, хотя и этоВот почему я добавил концепцию «виртуальной» блокировки, которая потребовала много дополнительных вещей в моей реализации.Меня особенно беспокоит обратный порядок доступа к структурам данных (словарь, затем очередь в методе Enqueue, но очередь, затем словарь в методе TryDequeue). Но чем больше я думаю о вашем модифицированном подходе, тем больше мне нравитсяЭто.И теперь я думаю, что это , потому что обратного порядка доступа, что это безопасно!

1 голос
/ 29 сентября 2010

Загляните в блог Эрика Лпперта. Вы можете найти то, что вам нравится ... http://blogs.msdn.com/b/ericlippert/archive/tags/immutability/

...