Является ли BlockingCollection защищенным от потоков? - PullRequest
2 голосов
/ 14 января 2020
BlockingCollection<int> blockingCollection = new BlockingCollection<int>();
    // create and start a producer
    Task.Factory.StartNew(() => {
        // put items into the collectioon
        for (int i = 0; i < 1000; i++)
        {
            blockingCollection.Add(i);
        }
        // mark the collection as complete
        blockingCollection.CompleteAdding();
    });
    // create and start a producer
    Task.Factory.StartNew(() => {
        while (!blockingCollection.IsCompleted)
        {
            // take an item from the collection
            int item = blockingCollection.Take();
            // print out the item
            Console.WriteLine("Taskid{1} Item {0}", item, Task.CurrentId);
        }
    });

    // create and start a producer
    Task.Factory.StartNew(() => {
        while (!blockingCollection.IsCompleted)   // if the blockingCollection is not completed
        {
            // take an item from the collection
            int item = blockingCollection.Take();  // but in here, all the items have been taken by other thread, this line will wait forever?
            // print out the item
            Console.WriteLine("Taskid{1} Item {0}", item,Task.CurrentId);
        }
    });
    Console.ReadLine();

Ключевые строки кода:

    while (!blockingCollection.IsCompleted)   // if the blockingCollection is not completed
    int item = blockingCollection.Take();  // but in here, all the items have been taken by other thread, this line will wait forever?

Ответы [ 3 ]

3 голосов
/ 14 января 2020

Да, BlockingCollection<T> является потокобезопасным .

Чтобы избежать проверки IsCompleted, вы можете поставить тот же лог c с помощью GetConsumingEnumerable(), который будет дождитесь завершения блокировки коллекции:

  // Producer:

  Task.Run(() => { 
    for (int i = 0; i < 1000; i++)
      blockingCollection.Add(i);

    blockingCollection.CompleteAdding();
  });    

  // Consumers:

  Task.Run(() => {
    foreach (var item in blockingCollection.GetConsumingEnumerable()) 
      Console.WriteLine("Taskid{1} Item {0}", item, Task.CurrentId);
  });

  Task.Run(() => {
    foreach (var item in blockingCollection.GetConsumingEnumerable()) 
      Console.WriteLine("Taskid{1} Item {0}", item, Task.CurrentId);
  });
1 голос
/ 14 января 2020

Вопрос 1 - Это потокобезопасно?

Цитирование документации BlockingCollection

Предоставляет возможности блокировки и ограничения для поточно-безопасных коллекции, которые реализуют IProducerConsumerCollection.

Вопрос 2 - Будет ли Take() принимать вечно?

Из официальной документации: Документы MS по блокировке коллекции - взять

OperationCanceledException

BlockingCollection пуст и помечен как завершенный в отношении дополнений.

Так, когда Take вызывается Завершено BlockingCollection исключение, которое вы можете обработать соответственно

0 голосов
/ 14 января 2020

Класс BlockingCollection является потокобезопасным в том смысле, что его внутреннее состояние защищено от повреждения при одновременном вызове несколькими потоками. Это не поточно-безопасный в том смысле, что его простое присутствие благословляет потокобезопасность небезопасным блоком кода!

Ваш код небезопасен, потому что между вызовами IsCompleted и Take, другой поток может вызвать метод Complete и изменить состояние BlockingCollection с незавершенного на завершенное. Поточно-безопасным решением было бы использовать API, который объединяет IsCompleted и Take в операции atomi c. В классе BlockingCollection такого API нет, но вы можете достичь желаемого с помощью метода GetConsumingEnumerable. Этот метод возвращает IEnumerable, который блокирует при ожидании доступных элементов, и завершается, когда завершается BlockingCollection. На самом деле это стандартный и предпочтительный способ потребления этой коллекции.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...