Выявление, когда продолжительные асинхронные задачи завершены - PullRequest
1 голос
/ 09 апреля 2019

[Обновлено 18 апреля 2018 г. с примером LinqPad - см. Конец]

Мое приложение получает список заданий:

var jobs = await myDB.GetWorkItems();

(NB: мы везде используем .ConfigureAwait (false), я просто не показываю его в этих фрагментах псевдокода.)

Для каждой работы мы создаем долгосрочную задачу. Однако мы не хотим ждать завершения этой длительной задачи.

jobs.ForEach(job =>
{
    var module = Factory.GetModule(job.Type);
    var task = Task.Run(() => module.ExecuteAsync(job.Data));
    this.NonAwaitedTasks.Add(task, module);
};

Задача и связанный с ней экземпляр модуля добавляются в ConcurrentDictionary, чтобы они не выходили за пределы области видимости.

В другом месте у меня есть другой метод, который иногда вызывается, который содержит следующее:

foreach (var entry in this.NonAwaitedTasks.Where(e => e.Key.IsCompleted))
{
    var module = entry.Value as IDisposable;
    module?.Dispose();
    this.NonAwaitedTasks.Remove(entry.Key);
}

(Обратите внимание, что NonAwaitedTasks дополнительно блокируется с помощью SemaphoreSlim ...)

Таким образом, идея заключается в том, что этот метод найдет все те задачи, которые выполнили, а затем избавится от соответствующего модуля и удалит их из этого словаря.

Однако ....

Во время отладки в Visual Studio 2017 я извлекаю одно задание из БД, и в то время как я отнимаю время для отладки в единственном модуле, который был создан, для этого модуля вызывается Dispose. Глядя в Callstack, я вижу, что Dispose был вызван в методе выше, и это потому, что задача имеет IsCompleted == true. Но, очевидно, это не может быть завершено, потому что я все еще отлаживаю его.

  • Является ли свойство .IsCompleted неправильным свойством для проверки?
  • Это просто артефакт отладки в Visual Studio?
  • Я поступаю неправильно?

Дополнительная информация

В комментариях ниже меня попросили предоставить некоторую дополнительную информацию о потоке, потому что то, что я описал, казалось невозможным (и действительно, я надеялся, что этого не может быть). Ниже приведена сокращенная версия моего кода (я удалил проверки на токен отмены и защитное кодирование, но ничего, что влияет на ход).

Точка входа в приложение

Это служба Windows. В OnStart () есть следующая строка:

this.RunApplicationTask = 
Task.Run(() => myApp.DoWorkAsync().ConfigureAwait(false), myService.CancelSource.Token);

«RunApplicationTask» - это просто свойство, позволяющее держать задачу в области действия в течение срока службы Службы.

DoWorkAsync ()

public async Task DoWorkAsync()
{
    do
    {
        await this.ExecuteSingleIterationAsync().ConfigureAwait(false);
        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
    }
    while (myApp.ServiceCancellationToken.IsCancellationRequested == false);

    await Task.WhenAll(this.NonAwaitedTasks.Keys).ConfigureAwait(false);
    await this.ClearCompletedTasksAsync().ConfigureAwait(false);
    this.WorkItemsTaskCompletionSource.SetResult(true);

    return;
}

Так что, пока я отлаживаю, это итерация DO-LOOP, она не попадает в Task.WhenAll (....).

Обратите внимание, что после вызова запроса на отмену и выполнения всех задач я вызываю ClearCompletedTasksAsync (). Подробнее об этом позже ....

ExecuteSingleIterationAsync

private async Task ExecuteSingleIterationAsync()
{
    var getJobsResponse = await DB.GetJobsAsync().ConfigureAwait(false);
    await this.ProcessWorkLoadAsync(getJobsResponse.Jobs).ConfigureAwait(false);
    await this.ClearCompletedTasksAsync().ConfigureAwait(false);
}

ProcessWorkLoadAsync

private async Task ProcessWorkLoadAsync(IList<Job> jobs)
{
    if (jobs.NoItems())
    {
        return ;
    }

    jobs.ForEach(job =>
    {
        // The processor instance is disposed of when removed from the NonAwaitedTasks collection.
        IJobProcessor processor = ProcessorFactory.GetProcessor(workItem, myApp.ServiceCancellationToken);
        try
        {
            var task = Task.Run(() => processor.ExecuteAsync(job).ConfigureAwait(false), myApp.ServiceCancellationToken);
            this.NonAwaitedTasks.Add(task, processor);
        }
        catch (Exception e)
        {
            ...
        }
    });

    return;
}

Каждый процессор реализует следующий метод интерфейса: Task ExecuteAsync (Job job);

Пока я в ExecuteAsync, вызывается .Dispose () для используемого экземпляра процессора.

ProcessorFactory.GetProcessor ()

public static IJobProcessor GetProcessor(Job job, CancellationToken token)
{
    .....
    switch (someParamCalculatedAbove)
    {
        case X:
            {
                return new XProcessor(...);
            }

        case Y:
            {
                return new YProcessor(...);
            }

        default:
            {
                return null;
            }
    }
}

Итак, мы получаем новый экземпляр.

ClearCompletedTasksAsync ()

private async Task ClearCompletedTasksAsync()
{
    await myStatic.NonAwaitedTasksPadlock.WaitAsync().ConfigureAwait(false);
    try
    {
        foreach (var taskEntry in this.NonAwaitedTasks.Where(entry => entry.Key.IsCompleted).ToArray())
        {
            var processorInstance = taskEntry.Value as IDisposable;
            processorInstance?.Dispose();
            this.NonAwaitedTasks.Remove(taskEntry.Key);
        }
    }
    finally
    {
        myStatic.NonAwaitedTasksPadlock.Release();
    }
}

Это называется каждой итерацией Do-Loop. Его целью является обеспечение того, чтобы список ожидаемых задач оставался небольшим.

И это все ... Утилита, кажется, вызывается только при отладке.

Пример LinqPad

async Task Main()
{
    SetProcessorRunning();

    await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);

    do
    {
        foreach (var entry in NonAwaitedTasks.Where(e => e.Key.IsCompleted).ToArray())
        {
            "Task is completed, so will dispose of the Task's processor...".Dump();
            var p = entry.Value as IDisposable;
            p?.Dispose();
            NonAwaitedTasks.Remove(entry.Key);
        }
    }
    while (NonAwaitedTasks.Count > 0);
}

// Define other methods and classes here

public void SetProcessorRunning()
{
    var p = new Processor();

    var task = Task.Run(() => p.DoWorkAsync().ConfigureAwait(false));
    NonAwaitedTasks.Add(task, p);
}

public interface IProcessor
{
    Task DoWorkAsync();
}

public static Dictionary<Task, IProcessor> NonAwaitedTasks = new Dictionary<Task, IProcessor>();

public class Processor : IProcessor, IDisposable
{
    bool isDisposed = false;
    public void Dispose()
    {
        this.isDisposed = true;
        "I have been disposed of".Dump();
    }

    public async Task DoWorkAsync()
    {
        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);

        if (this.isDisposed)
        {
            $"I have been disposed of (isDispose = {this.isDisposed}) but I've not finished work yet...".Dump();
        }

        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
    }
}

Выход:

Задача завершена, поэтому будет утилизировать процессор Задачи ...

Я был утилизирован

Я был удален (isDispose = True), но я не закончил работу еще ...

1 Ответ

2 голосов
/ 19 апреля 2019

Ваша проблема в этой строке:

var task = Task.Run(() => p.DoWorkAsync().ConfigureAwait(false));

Наведите указатель мыши на var и посмотрите, что это за тип.

Task.Run понимает async делегатов благодаря специальным правилам "развертывания задач" для Func<Task<Task>> и друзей. Но у него не будет специальной распаковки для Func<ConfiguredTaskAwaitable>.

Вы можете думать об этом таким образом; с кодом выше:

  1. p.DoWorkAsync() возвращает Task.
  2. Task.ConfigureAwait(false) возвращает ConfiguredTaskAwaitable.
  3. Таким образом, Task.Run предлагается запустить эту функцию, которая создает ConfiguredTaskAwaitable в потоке пула потоков.
  4. Таким образом, тип возвращаемого значения Task.Run равен Task<ConfiguredTaskAwaitable> - задача, которая завершается, как только создается ConfiguredTaskAwaitable. Когда он создан - не когда он завершается.

В этом случае ConfigureAwait(false) ничего не делает, потому что await не нужно настраивать. Таким образом, вы можете удалить его:

var task = Task.Run(() => p.DoWorkAsync());

Кроме того, как упомянул Servy, если вам не нужно для запуска DoWorkAsync в потоке пула потоков, вы также можете пропустить Task.Run:

var task = p.DoWorkAsync();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...