как получить доступ к основной параллельной очереди по умолчанию для блокирующей коллекции - PullRequest
6 голосов
/ 07 июля 2011

У меня есть несколько производителей и один потребитель.Однако, если в очереди есть что-то, что еще не используется, производитель не должен ставить его снова в очередь.(уникальная блокирующая коллекция без дубликатов, использующая параллельную очередь по умолчанию)

if (!myBlockingColl.Contains(item))
    myBlockingColl.Add(item)

Однако в блокирующем столбце нет метода содержимого, и при этом он не предоставляет никакого метода типа trypeek ().Как я могу получить доступ к лежащей в основе параллельной очереди, чтобы я мог сделать что-то вроде

if (!myBlockingColl.myConcurQ.trypeek(item)
  myBlockingColl.Add(item)

В хвостовой прокрутке.Пожалуйста помоги.спасибо

Ответы [ 3 ]

8 голосов
/ 07 июля 2011

Это интересный вопрос.Это первый раз, когда я вижу, как кто-то запрашивает очередь блокировки, которая игнорирует дубликаты.Как ни странно, я не смог найти ничего похожего на то, что вы хотите, которое уже существует в BCL.Я говорю, что это странно, потому что BlockingCollection может принять IProducerConsumerCollection в качестве базовой коллекции, у которой есть метод TryAdd, который объявляется способным завершаться ошибкой при обнаружении дубликатов.Проблема в том, что я не вижу конкретной реализации IProducerConsumerCollection, которая предотвращает дублирование.По крайней мере, мы можем написать свои собственные.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
  // TODO: You will need to fully implement IProducerConsumerCollection.

  private Queue<T> queue = new Queue<T>();

  public bool TryAdd(T item)
  {
    lock (queue)
    {
      if (!queue.Contains(item))
      {
        queue.Enqueue(item);
        return true;
      }
      return false;
    }
  }

  public bool TryTake(out T item)
  {
    lock (queue)
    {
      item = null;
      if (queue.Count > 0)
      {
        item = queue.Dequeue();
      }
      return item != null;
    }
  }
}

Теперь, когда у нас есть IProducerConsumerCollection, который не принимает дубликаты, мы можем использовать его следующим образом:

public class Example
{
  private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>());

  public Example()
  {
    new Thread(Consume).Start();
  }

  public void Produce(object item)
  {
    bool unique = queue.TryAdd(item);
  }

  private void Consume()
  {
    while (true)
    {
      object item = queue.Take();
    }
  }
}

Вам может не понравитьсямоя реализация NoDuplicatesConcurrentQueue.Вы, безусловно, можете свободно реализовывать свои собственные, используя ConcurrentQueue или что-то еще, если считаете, что вам нужна производительность с низким уровнем блокировки, которую обеспечивают коллекции TPL.

Обновление:

Я был в состоянии проверить код этим утром.Есть хорошие и плохие новости.Хорошей новостью является то, что это будет технически работать.Плохая новость заключается в том, что вы, вероятно, не захотите этого делать, потому что BlockingCollection.TryAdd перехватывает возвращаемое значение из базового метода IProducerConsumerCollection.TryAdd и выдает исключение при обнаружении false.Да, это правильно.Он не возвращает false, как вы ожидаете, и вместо этого генерирует исключение.Я должен быть честным, это и удивительно и смешно.Смысл методов TryXXX в том, что они не должны генерировать исключения.Я глубоко разочарован.

5 голосов
/ 06 июня 2014

Помимо предостережения Брайана Гидеона, упомянутого после Обновление , его решение страдает от следующих проблем с производительностью:

  • O (n) операций надОчередь (queue.Contains(item)) сильно влияет на производительность, так как очередь растет.
  • блокирует ограничение параллелизма (о котором он упоминает)

Следующий код улучшает решение Брайана на

  • с использованием хэша, установленного на O (1) lookups
  • , объединяющего 2 структуры данных из пространства имен System.Collections.Concurrent

NBПоскольку ConcurrentHashSet отсутствует, я использую ConcurrentDictionary, игнорируя значения.

В этом редком случае, к счастью, можно просто составить более сложную параллельную структуру данных из нескольких более простых,без добавления замков.Здесь важен порядок операций с двумя параллельными структурами данных.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
    private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>();
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public bool TryAdd(T item)
    {
        if (existingElements.TryAdd(item, false))
        {
            queue.Enqueue(item);
            return true;
        }
        return false;
    }

    public bool TryTake(out T item)
    {
        if (queue.TryDequeue(out item))
        {
            bool _;
            existingElements.TryRemove(item, out _);
            return true;
        }
        return false;
    }
    ...
}

NB. Другой способ решения этой проблемы: вам нужен набор, который сохраняет порядок вставки .

1 голос
/ 07 июля 2011

Я бы посоветовал реализовать ваши операции с блокировкой, чтобы вы не читали и не писали элемент таким образом, что это повреждает его, делая его атомарным.Например, с любым IEnumerable:

object bcLocker = new object();

// ...

lock (bcLocker)
{
    bool foundTheItem = false;
    foreach (someClass nextItem in myBlockingColl)
    {
        if (nextItem.Equals(item))
        {
            foundTheItem = true;
            break;
        }
    }
    if (foundTheItem == false)
    {
        // Add here
    }
}
...