ConcurrentDictionary с несколькими значениями на ключ, удаляя пустые записи - PullRequest
0 голосов
/ 15 марта 2020

ConcurrentDictionary хорошо работает в параллельных ситуациях, когда ключи сопоставляются с одним значением. При сопоставлении с несколькими значениями легко создать ConcurrentDictionary<K, List<V>> и защитить его функции добавления / удаления.

ConcurrentDictionary <string, List<string>> d;

// Add
var list = d.GetOrAdd ("key", x => new List<string> ());
lock (list) {
    list.Add ("value to add");
}

// Remove
if (d.TryGetValue ("key", out var list)) {
    lock (list) {
        list.Remove ("value to remove");
    }
}

Однако вышеизложенное предполагало, что пустым спискам разрешено оставаться. Я не хочу этого Но удаление пустых пар не представляется возможным в виде атома c. Можно попробовать:

if (d.TryGetValue ("key", out var list)) {
    lock (list) {
        if (list.Remove ("value to remove") && list.Count == 0) {
            d.TryRemove ("key", out _);
        }
    }
}

Но это имеет условие гонки, когда другой поток захватывает список раньше, но добавляет его после того, как он был очищен и удален в другом месте:

  • A: получить list
  • B: получить список
  • B: заблокировать, удалить из списка
  • B: список пуст, удалить ключ, разблокировать
  • A: заблокировать, добавить в список, разблокировать

Блокировка словаря невозможна (это другой вариант использования).

Насколько я могу судить, решение обычно можно найти с помощью сравнения -and-swap операции и замена списка, например, неизменным массивом, который затем заменяется полностью. Однако, учитывая, что ConcurrentDictionary не предлагает TryRemove с ожидаемым значением для сравнения, я не совсем понимаю, как это сделать. Возможно, существует двухэтапное решение?

Использование параметра out TryRemove для повторного добавления значений после их удаления (для исправления расы) невозможно - словарь ненадолго будет находиться в несогласованном состоянии.

На этом сайте есть много вопросов о подобных сценариях ios, но большинство из них страдают от тривиальных ошибок или не удаляют пустые записи. Существует очень связанный вопрос , который спрашивает, возможно ли это сделать. К сожалению, ему пять лет, которому уделяется очень мало внимания, и у него нет решения, кроме как прибегнуть к замкам (что побеждает цель). Возможно, с тех пор открылся лучший путь.


(отредактированный пример для ясности)

Ответы [ 3 ]

2 голосов
/ 17 марта 2020

Мне удалось реализовать класс ConcurrentMultiDictionary, который хранит несколько значений на ключ и удаляет пустые записи. Значения каждого ключа хранятся в HashSet, поэтому каждый ключ имеет уникальные значения. Это увеличивает производительность удаления значения, когда число значений велико. Если уникальность является проблемой, тогда HashSet следует заменить на List, а метод Add следует изменить, чтобы он возвращал void вместо bool.

Атомность Операции добавления и удаления выполняются вращением. Когда пакет значений становится пустым, он помечается как «отброшенный». Добавление значений в выброшенную сумку недопустимо, поэтому операция Add вращается до тех пор, пока не возьмет неотброшенную сумку. Операция Remove тоже вращается. Таким образом, единственная нить, которой разрешено снимать выброшенный мешок, - это та же нить, которая пометила мешок как выброшенный. Все остальные темы будут вращаться, пока это не произойдет. SpinWait структуры используются для вращения, чтобы обеспечить эффективность даже на однопроцессорных машинах.

Нерешаемая проблема этой реализации состоит в том, как реализовать метод ToArray, который делает снимок всех ключей и значений, хранящихся в словаре. Метод ConcurrentDictionary.ToArray возвращает снимок ключей, но пакеты могут постоянно меняться, и поэтому я считаю, что это неразрешимо.

Даже реализация интерфейса IEnumerable немного сложно, потому что, если мы просто перечислим KeyValuePair s основного словаря, большинство пакетов может быть отброшено в момент получения их значений. Таким образом, во время перечисления пакет каждого ключа извлекается индивидуально, чтобы быть как можно более актуальным.

public class ConcurrentMultiDictionary<TKey, TValue>
    : IEnumerable<KeyValuePair<TKey, TValue[]>>
{
    private class Bag : HashSet<TValue>
    {
        public bool IsDiscarded { get; set; }
    }

    private readonly ConcurrentDictionary<TKey, Bag> _dictionary;

    public ConcurrentMultiDictionary()
    {
        _dictionary = new ConcurrentDictionary<TKey, Bag>();
    }

    public int Count => _dictionary.Count;

    public bool Add(TKey key, TValue value)
    {
        var spinWait = new SpinWait();
        while (true)
        {
            var bag = _dictionary.GetOrAdd(key, _ => new Bag());
            lock (bag)
            {
                if (!bag.IsDiscarded) return bag.Add(value);
            }
            spinWait.SpinOnce();
        }
    }

    public bool Remove(TKey key, TValue value)
    {
        var spinWait = new SpinWait();
        while (true)
        {
            if (!_dictionary.TryGetValue(key, out var bag)) return false;
            bool spinAndRetry = false;
            lock (bag)
            {
                if (bag.IsDiscarded)
                {
                    spinAndRetry = true;
                }
                else
                {
                    bool valueRemoved = bag.Remove(value);
                    if (!valueRemoved) return false;
                    if (bag.Count != 0) return true;
                    bag.IsDiscarded = true;
                }
            }
            if (spinAndRetry) { spinWait.SpinOnce(); continue; }
            bool keyRemoved = _dictionary.TryRemove(key, out var currentBag);
            Debug.Assert(keyRemoved, $"Key {key} was not removed");
            Debug.Assert(bag == currentBag, $"Removed wrong bag");
            return true;
        }
    }

    public bool TryGetValues(TKey key, out TValue[] values)
    {
        if (!_dictionary.TryGetValue(key, out var bag)) { values = null; return false; }
        bool isDiscarded;
        lock (bag) { isDiscarded = bag.IsDiscarded; values = bag.ToArray(); }
        if (isDiscarded) { values = null; return false; }
        return true;
    }

    public bool Contains(TKey key, TValue value)
    {
        if (!_dictionary.TryGetValue(key, out var bag)) return false;
        lock (bag) return !bag.IsDiscarded && bag.Contains(value);
    }

    public bool ContainsKey(TKey key) => _dictionary.ContainsKey(key);

    public ICollection<TKey> Keys => _dictionary.Keys;

    public IEnumerator<KeyValuePair<TKey, TValue[]>> GetEnumerator()
    {
        foreach (var key in _dictionary.Keys)
        {
            if (this.TryGetValues(key, out var values))
            {
                yield return new KeyValuePair<TKey, TValue[]>(key, values);
            }
        }
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

Эта реализация была протестирована с 8 одновременными работниками, мутировавшими словарь миллион раз на одного работника, и никаких противоречий относительно зарегистрировано количество добавлений и удалений.

1 голос
/ 16 марта 2020

Кажется, что нет практического способа удалить пустую коллекцию (даже если она синхронизирована) из параллельного словаря без проблем состояния гонки. Существуют определенные факты, которые не позволяют сделать это возможным, о чем говорилось в комментариях как к вопросу, так и к самостоятельному ответу ОП.

Однако то, что я написал в своем комментарии, казалось выполнимым, и я хотел попробовать .

Я хочу обсудить недостатки этой реализации сразу после этого, и я должен также сказать, что ваши комментарии (если они были получены) являются для меня наиболее ценными.

Во-первых, использование :

    static void Main(string[] args)
    {
        var myDictionary = new ConcurrentDictionary<string, IList<int>>();
        IList<int> myList = myDictionary.AddSelfRemovingList<string, int>("myList");

        myList.Add(5);
        myList.Add(6);

        myList.Remove(6);
        myList.Remove(5);

        IList<int> existingInstance;

        // returns false:
        bool exists = myDictionary.TryGetValue("myList", out existingInstance);

        // throws HasAlreadyRemovedSelfException:
        myList.Add(3);
    }

AddSelfRemovingList - это метод расширения, облегчающий задачу.

Для обсуждения:

  1. Недопустимо удаление элемент из коллекции, имеющий побочный эффект удаления ссылки на коллекцию из словаря-владельца.
  2. Также не рекомендуется делать коллекцию устаревшей (непригодной для использования), когда все ее элементы удалены. Существует большая вероятность того, что потребитель коллекции захочет очистить и повторно заполнить коллекцию, и эта реализация не позволяет этого.
  3. Это заставляет использовать IList<T> абстракцию и пользовательскую реализацию над * 1021. *

Хотя это обеспечивает реальный потокобезопасный способ удаления только что опустошенной коллекции из словаря, кажется, что у нее больше минусов, чем плюсов. Это следует использовать только в закрытом контексте, где коллекции внутри параллельного словаря открыты для внешнего использования, и где немедленное удаление коллекции после очистки, даже если какой-то другой поток обращается к ней в данный момент, является существенным.

Вот метод расширения для создания и добавления в словарь списка самоудаления:

public static class ConcurrentDictionaryExtensions
{
    public static IList<TValue> AddSelfRemovingList<TKey, TValue>(this ConcurrentDictionary<TKey, IList<TValue>> dictionaryInstance, TKey key)
    {
        var newInstance = new SelfRemovingConcurrentList<TKey, TValue>(dictionaryInstance, key);
        if (!dictionaryInstance.TryAdd(key, newInstance))
        {
            throw new ArgumentException("ownerAccessKey", "The passed ownerAccessKey has already exist in the parent dictionary");
        }
        return newInstance;
    }
}

И наконец; Вот синхронизированная, самоудерживающаяся реализация IList<T>:

public class SelfRemovingConcurrentList<TKey, TValue> : IList<TValue>
{
    private ConcurrentDictionary<TKey, IList<TValue>> owner;
    private TKey ownerAccessKey;
    List<TValue> underlyingList = new List<TValue>();
    private bool hasRemovedSelf;

    public class HasAlreadyRemovedSelfException : Exception
    {

    }

    internal SelfRemovingConcurrentList(ConcurrentDictionary<TKey, IList<TValue>> owner, TKey ownerAccessKey)
    {
        this.owner = owner;
        this.ownerAccessKey = ownerAccessKey;
    }

    private void ThrowIfHasAlreadyRemovedSelf()
    {
        if (hasRemovedSelf)
        {
            throw new HasAlreadyRemovedSelfException();
        }
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    int IList<TValue>.IndexOf(TValue item)
    {
        ThrowIfHasAlreadyRemovedSelf();
        return underlyingList.IndexOf(item);
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    void IList<TValue>.Insert(int index, TValue item)
    {
        ThrowIfHasAlreadyRemovedSelf();
        underlyingList.Insert(index, item);
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    void IList<TValue>.RemoveAt(int index)
    {
        ThrowIfHasAlreadyRemovedSelf();
        underlyingList.RemoveAt(index);
        if (underlyingList.Count == 0)
        {
            hasRemovedSelf = true;
            IList<TValue> removedInstance;
            if (!owner.TryRemove(ownerAccessKey, out removedInstance))
            {
                // Just ignore.
                // What we want to do is to remove ourself from the owner (concurrent dictionary)
                // and it seems like we have already been removed!
            }
        }
    }

    TValue IList<TValue>.this[int index]
    {
        [MethodImpl(MethodImplOptions.Synchronized)]
        get
        {
            ThrowIfHasAlreadyRemovedSelf();
            return underlyingList[index];
        }
        [MethodImpl(MethodImplOptions.Synchronized)]
        set
        {
            ThrowIfHasAlreadyRemovedSelf();
            underlyingList[index] = value;
        }
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    void ICollection<TValue>.Add(TValue item)
    {
        ThrowIfHasAlreadyRemovedSelf();
        underlyingList.Add(item);
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    void ICollection<TValue>.Clear()
    {
        ThrowIfHasAlreadyRemovedSelf();
        underlyingList.Clear();
        hasRemovedSelf = true;
        IList<TValue> removedInstance;
        if (!owner.TryRemove(ownerAccessKey, out removedInstance))
        {
            // Just ignore.
            // What we want to do is to remove ourself from the owner (concurrent dictionary)
            // and it seems like we have already been removed!
        }
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    bool ICollection<TValue>.Contains(TValue item)
    {
        ThrowIfHasAlreadyRemovedSelf();
        return underlyingList.Contains(item);
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    void ICollection<TValue>.CopyTo(TValue[] array, int arrayIndex)
    {
        ThrowIfHasAlreadyRemovedSelf();
        underlyingList.CopyTo(array, arrayIndex);
    }

    int ICollection<TValue>.Count
    {
        [MethodImpl(MethodImplOptions.Synchronized)]
        get
        {
            ThrowIfHasAlreadyRemovedSelf();
            return underlyingList.Count;
        }
    }

    bool ICollection<TValue>.IsReadOnly
    {
        [MethodImpl(MethodImplOptions.Synchronized)]
        get
        {
            ThrowIfHasAlreadyRemovedSelf();
            return false;
        }
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    bool ICollection<TValue>.Remove(TValue item)
    {
        ThrowIfHasAlreadyRemovedSelf();
        bool removalResult = underlyingList.Remove(item);
        if (underlyingList.Count == 0)
        {
            hasRemovedSelf = true;
            IList<TValue> removedInstance;
            if (!owner.TryRemove(ownerAccessKey, out removedInstance))
            {
                // Just ignore.
                // What we want to do is to remove ourself from the owner (concurrent dictionary)
                // and it seems like we have already been removed!
            }
        }
        return removalResult;
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    IEnumerator<TValue> IEnumerable<TValue>.GetEnumerator()
    {
        ThrowIfHasAlreadyRemovedSelf();
        return underlyingList.GetEnumerator();
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    IEnumerator IEnumerable.GetEnumerator()
    {
        ThrowIfHasAlreadyRemovedSelf();
        return underlyingList.GetEnumerator();
    }
}
0 голосов
/ 15 марта 2020

Вопрос может быть решен с помощью словаря, который предлагает вариант TryRemove, который сначала проверяет, что текущее значение равно ожидаемому значению. Только если значения сравниваются равными, значение заменяется (атомарно). В противном случае операция возвращает ошибку.

Оказывается, ConcurrentDictionary уже реализует именно эту функцию:

/// <summary>
/// Removes the specified key from the dictionary if it exists and returns its associated value.
/// If matchValue flag is set, the key will be removed only if is associated with a particular
/// value.
/// </summary>
/// <param name="key">The key to search for and remove if it exists.</param>
/// <param name="value">The variable into which the removed value, if found, is stored.</param>
/// <param name="matchValue">Whether removal of the key is conditional on its value.</param>
/// <param name="oldValue">The conditional value to compare against if <paramref name="matchValue"/> is true</param>
/// <returns></returns>
private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)

TryRemove вызывает это (с matchValue установить в ложь). К сожалению, метод не раскрыт (это private). Таким образом, простым решением было бы скопировать существующий класс и изменить этот метод на publi c. Я не уверен, почему это не было разоблачено. Если бы указанные функции c не работали должным образом, matchValue, скорее всего, были бы удалены ранее.

Как отмечает @Theodor Zoulias, также можно вызывать частный метод TryRemoveInternal с помощью отражение . Насколько я знаю, это можно сделать без существенного влияния на производительность.

Существуют также сторонние реализации с (заявленной) высокой производительностью и параллелизмом, которые демонстрируют TryRemove (..., expectedValue).

После выбора реализации следующий код реализует запрашиваемую функциональность. Он использует операции atomi c сравнения и обмена, предоставленные словарем в al oop, до тех пор, пока он не завершится успешно (аналогично тому, что многие параллельные словари тоже делают внутри). Насколько я знаю, это типичный подход в алгоритмах без блокировки.

// Use any third-party dictionary that offers TryRemove() with
// a value to compare against (two are mentioned above)
ConcurrentDictionary<TKey, List<TValue>> d;
...

// To remove a value from key:
// Loop until the compare-and-swap of either update or removal succeeded
while (true)
{
    // If the key does not exist, exit
    if (!d.TryGetValue (key, out var list)) {
        break;
    }

    // Remove the value from this key's entry:
    // Consider the old value immutable, copy-and-modify it instead
    List<TValue> newlist;
    lock (list) {
        newlist = list.Where (it => it != valueToRemove).ToList ();
    }

    // If the value list is not empty, compare-and-update it
    if (newlist.Count > 0) {
        if (d.TryUpdate (key: key, newValue: newlist, expectedValue: list)) {
            return;
        }
    }
    else // The key's value list is empty - compare-and-remove the entire key
    {
        // Remove the key iff the associated value is still the same
        if (d.TryRemove (key: key, expectedValue: list)) { // Note that list is an in-, not an out-parameter
            return;
        }
    }

    // If we reach this point, the operation failed - try again
}
...