BlockingCollection Take () Блокирование навсегда - PullRequest
0 голосов
/ 02 ноября 2018

У меня есть простая настройка потребителя производителя с использованием коллекции блокировок. Потребитель сидит в цикле на протяжении нашего приложения, ожидая, пока потребитель поместит элементы в коллекцию, затем извлекает элемент и записывает его в последовательный порт. По какой-то причине collection.Take () блокируется навсегда, когда в коллекции есть элементы. Для этого приложения у нас может быть один или несколько таких активных производителей одновременно. Они ведут себя одинаково независимо.

public class ProducerConsumer 
{
    private Task _backgroundWorker;
    private CancellationTokenSource _cancellationTokenSource;
    private BlockingCollection<Data> _dataQueue;

    public ProducerConsumer() 
    {
        _dataQueue = new BlockingCollection<Data>();
        _cancellationTokenSource = new CancellationTokenSource();
        _backgroundWorker = new Task(() => DoWork(_cancellationTokenSource.Token), TaskCreationOptions.LongRunning);
        _backgroundWorker.Start();
    }

    public void AddData(Data data) 
    {
        _dataQueue.Add(data);
        System.Diagnostics.Debug.WriteLine(_dataQueue.Count);
    }

    private void DoWork(CancellationToken cancellationToken)
    {
        while(!cancellationToken.IsCancellationRequested)
        {
            try
            {
                _dataQueue.Take(cancellationToken); //This is blocking forever

                //DoWork
            }
            catch(OperationCanceledException) { }
            catch(Exception e)
            {
                System.Diagnostics.Debug.WriteLine(e.ToString());
                throw;
            }
        }
    }  
}

При выполнении этого оператора печати увеличивается, поэтому у нас определенно есть данные в коллекции, но по какой-либо причине Take () продолжает блокировать.

Это также не исключение.

Отмена запрашивается с помощью Dispose (), но я не добавил это здесь. Это не называется называться рано.

Я пытался использовать .GetConsumingEnumerable (), и это также навсегда блокирует.

Я неправильно запускаю задание? Могу ли я исчерпать темы?

Я рассмотрел использование BackgroundWorker вместо Задачи, но согласно MSFT Задача предпочтительнее.

Заранее спасибо.

1 Ответ

0 голосов
/ 02 ноября 2018

Прежде всего, я бы не пытался создать собственную реализацию производителя / потребителя, особенно такую, которая не блокирует. Простой сценарий производитель / потребитель может быть легко обработан с помощью ActionBlock . ActionBlock имеет внутреннюю очередь, в которую могут отправлять сообщения несколько одновременно работающих производителей. ActionbBlock будет обрабатывать помещенные в очередь сообщения в фоновом режиме, используя рабочий метод, передаваемый его конструктору:

class SerialWorker
{
    ActionBlock<Data>  _serialBlock;

    public SerialWorker()
    {    
        _serialBlock=new ActionBlock<Data>(data=>DoWork(data));
    }

    //The worker action can be synchronous 
    private void DoWork(Data data)
    {
    }
    //or asynchronous
    private async Task DoWorkAsync(Data data)
    {
    }


    //Producer Code
    //While the application runs :
    public void PostData(Data data)
    {
        _serialBlock.Post(someData);
    }

//When the application finishes 
//Tell the block to shut down and wait for it to process any leftover requests
    public async Task Shutdown()
    {
        _serialBlock.Complete();    
        await _serialBlock.Completion;
    }

Рабочий метод может быть асинхронным, например, new ActionBlock<Data>(data=>DoWorkAsync(data)) будет работать просто отлично. Это позволяет использовать асинхронные методы без блокировки внутри самого работника.

Новые сообщения публикуются с ActionBlock.Post. Когда пришло время завершать работу, приложение должно вызвать Complete(), чтобы уведомить блок действий и дождаться его завершения. ActionBlock прекратит получать больше сообщений и обработает все, что еще осталось в его буфере, перед завершением.

...