BufferBlock.ReceiveAsyn c (время ожидания) зависает, но BufferBlock.ReceiveAsyn c () работает нормально - PullRequest
0 голосов
/ 25 апреля 2020

Либо я делаю что-то действительно неправильное, но приведенное ниже никогда не возвращает, оно навсегда зависает на ReceiveAsync, несмотря на указание времени ожидания в 1 секунду.

Я ожидаю, что оно вернет нулевое значение после истечения времени ожидания ,

/* snipped MyContainer class */

private readonly BufferBlock<byte[]> queue = new BufferBlock<byte[]>();

public async Task ExecuteAsync(CancellationToken stoppingToken)
{
     // makes no difference if creating with TaskCreationOptions.LongRunning
     await Task
            .Factory
            .StartNew(async () =>
            {

              while (stoppingToken.IsCancellationRequested == false)
              {                     
                 // we get here OK, but no further if i use TimeSpan for delay
                 // without Timespan i.e. ReceiveAsync() only, it does **not** hang
                 var item = await 
                             this
                               .queue
                               .ReceiveAsync(TimeSpan.FromMilliseconds(1000));

                      // process it, but we never get here we sleep forever
                      await ProcessAsync(item);
                    }
             } /*,TaskCreationOptions.LongRunning*/);

     // we get here and print the below OK
     Console.WriteLine("thread created and running");
}

// this is called by the original (or some other) thread
// either if i call this or not, the above thread code still locks on ReceiveAsync
public void Add(byte[] data)
{
   Console.WriteLine("adding");
   this.queue.Post(data);
   Console.WriteLine("done"); // gets posted OK     
}

Важное обновление - работает нормально, если я не указываю задержку

var item = await this.queue.ReceiveAsync());

Код работает хорошо, если я удаляю задержку, однако я делаю некоторые фоновое обслуживание каждую секунду (для счетчиков пакетов и т. д. c), поэтому важно проснуться, если ничего не получено в течение 1 секунды.

Другие примечания:

Я вызов приведенного выше кода с универсального c точка net рабочего хоста:

public class Worker : BackgroundService
{
    private readonly MyContainer containerClass;
    private readonly ILogger<Worker> logger;

    public Worker(MyContainer containerClass, ILogger<Worker> logger)
    {
        this.containerClass = containerClass;
        this.logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        this.containerClass.ExecuteAsync(stoppingToken);

        while (!stoppingToken.IsCancellationRequested)
        {                
            this.logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
            await Task.Delay(1000, stoppingToken);
        }
    }
}

Вышеуказанный код вызывается после того, как рабочий построен IHostBuilder и я позвонил Host.Run().

Насколько я понимаю (над чем мне явно нужно поработать!), Поскольку я создаю поток, он должен работать совершенно независимо от (и не блокироваться) потока, который его создал / вызвал ... другими словами, он должен быть в состоянии вызывать ReceiveAsyn c внутри самого потока без блокировки.

Ответы [ 2 ]

2 голосов
/ 26 апреля 2020

Использование Task.Factory.StartNew с асинхронным делегатом c создает вложенную задачу:

Task<Task> nestedTask = Task.Factory.StartNew(async () => { //...

Вы ожидаете внешнюю задачу, но не внутреннюю, поэтому внутренняя задача становится забыть задачу Можно ожидать обеих задач в одной строке, дважды используя оператор await:

await await Task.Factory.StartNew(async () => { //...

В качестве альтернативы вы можете объединить две задачи в одну, используя метод Unwrap .

await Task.Factory.StartNew(async () => { /* ... */ }).Unwrap();

... или даже лучше использовать метод Task.Run вместо Task.Factory.StartNew, поскольку первый понимает асинхронные c делегаты и выполняет развертывание для Вы:

await Task.Run(async () => { //...

Если вас интересует разница между Task.Factory.StartNew и Task.Run, вы можете прочитать информативную статью здесь .

0 голосов
/ 25 апреля 2020

Спасибо всем, кто откликнулся и, наконец, Энрико (не стесняйтесь копировать / вставлять, и я назначу вам ответ), код на самом деле работал нормально.

Исключение TimeoutException вызывалось, но не было не пойман моим кодом или Visual Studio.

Включение всех исключений CLR согласно https://docs.microsoft.com/en-us/visualstudio/debugger/managing-exceptions-with-the-debugger?view=vs-2019 исключению стало вызываться.

Затем я обработал исключение в коде, и смог продолжить, как требовал мой дизайн:

public Task ExecuteAsync(CancellationToken stoppingToken)
{
    return Task
            .Factory
            .StartNew(async () => {

                while (stoppingToken.IsCancellationRequested == false)
                {
                    try
                    {
                        var ts = TimeSpan.FromSeconds(UpdateFrequencySeconds);
                        var item = await this.queue.ReceiveAsync(ts);
                        await ProcessAsync(item);
                    }
                    catch (TimeoutException)
                    {
                        // this is ok, timer expired 
                    }
                    catch (Exception e)
                    {
                        this.logger.LogError(e.ToString());
                    }

                    UpdateCounters();
                }

                await StopAsync();
              },
              stoppingToken,
              TaskCreationOptions.LongRunning, 
              TaskScheduler.Default)
              .Unwrap();
 }
...