Как заставить Dispose ожидать всех асинхронных методов? - PullRequest
1 голос
/ 15 мая 2019

У меня есть одноразовый класс с асинхронными методами.

class Gateway : IDisposable {
  public Gateway() {}
  public void Dispose() {}

  public async Task<Data> Request1 () {...}
  public async Task<Data> Request2 () {...}
  public async Task<Data> Request3 () {...}
}

Мне нужно, чтобы Dispose ждал, пока все запущенные запросы не будут выполнены.

Итак, мне нужно отслеживать все запущенные задачи, или использовать AsyncLock из AsyncEx, или что-то еще?

Обновлено

Как я вижу, кто-то боится блокировать Dispose. Тогда мы могли бы сделать Task WaitForCompletionAsync() или Task CancelAllAsync() методы.

Ответы [ 4 ]

1 голос
/ 15 мая 2019

В настоящее время вам придется добавить CloseAsync метод, который должны вызывать ваши пользователи.

Как только C # 8.0 выпущен, вы можете положиться на IAsyncDisposable интерфейс и его языковую поддержку:

await using (var asyncDisposable GetAsyncDisposable())
{
    // ...
} // await asyncDisposable.DisposeAsync()
1 голос
/ 15 мая 2019

Вот решение для многоразовой поддержки асинхронного удаления. В связи с тем, что .NET Core 3.0 еще не выпущен, я предоставлю код как для текущей версии C # (7.3), так и для бета-версии (8.0).

Как только IDisposable.Dispose() вызывается для объекта, он не блокируется и обеспечит удаление сразу после завершения всех задач.

Исходный код (текущая версия C #, без IAsyncDisposable)

Связанные с использованием:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

Интерфейс, который можно утилизировать после выполнения всех задач отслеживания:

public interface ITrackingDisposable : IDisposable
{
    //The implementation of the actual disposings
    Task FinishDisposeAsync();
}

Диспетчер, который отслеживает все запущенные задачи и вызывает отложенное удаление в соответствующий момент времени:

public class TrackingDisposer : IDisposable
{
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>();

    private readonly ITrackingDisposable _target;

    public bool IsDisposed { get; private set; } = false;

    //The supported class must implement ITrackingDisposable
    public TrackingDisposer(ITrackingDisposable target)
    => _target = target ?? throw new ArgumentNullException();

    //Add a task to the tracking list, returns false if disposed
    //Without return value
    public bool Track(Func<Task> func, out Task result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task ending()
            {
                await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                }
            }

            result = ending();
        }
        return true;
    }

    //With return value
    public bool Track<TResult>(Func<Task<TResult>> func, out Task<TResult> result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task<TResult> ending()
            {
                var result = await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                }
                return result;
            }

            result = ending();
        }
        return true;
    }

    //The entry of applying for dispose
    public void Dispose()
    {
        var dispose = false;

        lock (_tasks)
        {
            if (IsDisposed)
            {
                return;
            }

            IsDisposed = true;
            dispose = _tasks.Count == 0;
        }

        if (dispose)
        {
            _target.FinishDisposeAsync();
        }
    }
}

Базовый класс, упрощающий реализацию:

public abstract class TrackingDisposable : ITrackingDisposable
{
    private readonly TrackingDisposer _disposer;

    public TrackingDisposable()
    => _disposer = new TrackingDisposer(this);

    protected virtual void FinishDispose() { }

    protected virtual Task FinishDisposeAsync()
    => Task.CompletedTask;

    Task ITrackingDisposable.FinishDisposeAsync()
    {
        FinishDispose();
        return FinishDisposeAsync();
    }

    public void Dispose()
    => _disposer.Dispose();

    protected Task Track(Func<Task> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));

    protected Task<TResult> Track<TResult>(Func<Task<TResult>> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));
}

Демонстрация и тестовый вывод

Класс тестирования:

internal sealed class TestDisposingObject : TrackingDisposable
{
    public Task Job0Async() => Track(async () =>
    {
        await Task.Delay(200);
        Console.WriteLine("Job0 done.");
    });

    public Task<string> Job1Async(int ms) => Track(async () =>
    {
        await Task.Delay(ms);
        return "Job1 done.";
    });

    protected override void FinishDispose()
    => Console.WriteLine("Disposed.");
}

Main:

internal static class Program
{
    private static async Task Main()
    {
        var result0 = default(Task);
        var result1 = default(Task);
        var obj = new TestDisposingObject();
        result0 = obj.Job0Async();
        result1 = obj.Job1Async(100).ContinueWith(r => Console.WriteLine(r.Result));
        obj.Dispose();
        Console.WriteLine("Waiting For jobs done...");
        await Task.WhenAll(result0, result1);
    }
}

Выход:

Waiting For jobs done...
Job1 done.
Job0 done.
Disposed.

Дополнительно, C # 8.0 (с IAsyncDisposable)

Заменить определение типа следующим:

public interface ITrackingDisposable : IDisposable, IAsyncDisposable
{
    Task FinishDisposeAsync();
}

public class TrackingDisposer : IDisposable, IAsyncDisposable
{
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>();

    private readonly ITrackingDisposable _target;

    private readonly TaskCompletionSource<object> _disposing = new TaskCompletionSource<object>();

    public bool IsDisposed { get; private set; } = false;

    public TrackingDisposer(ITrackingDisposable target)
    => _target = target ?? throw new ArgumentNullException();

    public bool Track(Func<Task> func, out Task result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task ending()
            {
                await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                    _disposing.SetResult(null);
                }
            }

            result = ending();
        }
        return true;
    }

    public bool Track<TResult>(Func<Task<TResult>> func, out Task<TResult> result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task<TResult> ending()
            {
                var result = await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                    _disposing.SetResult(null);
                }
                return result;
            }

            result = ending();
        }
        return true;
    }

    public void Dispose()
    {
        var dispose = false;

        lock (_tasks)
        {
            if (IsDisposed)
            {
                return;
            }

            IsDisposed = true;
            dispose = _tasks.Count == 0;
        }

        if (dispose)
        {
            _target.FinishDisposeAsync();
            _disposing.SetResult(null);
        }
    }

    public ValueTask DisposeAsync()
    {
        Dispose();
        return new ValueTask(_disposing.Task);
    }
}

public abstract class TrackingDisposable : ITrackingDisposable
{
    private readonly TrackingDisposer _disposer;

    public TrackingDisposable()
    => _disposer = new TrackingDisposer(this);

    protected virtual void FinishDispose() { }

    protected virtual Task FinishDisposeAsync()
    => Task.CompletedTask;

    Task ITrackingDisposable.FinishDisposeAsync()
    {
        FinishDispose();
        return FinishDisposeAsync();
    }

    public void Dispose()
    => _disposer.Dispose();

    public ValueTask DisposeAsync() => _disposer.DisposeAsync();

    protected Task Track(Func<Task> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));

    protected Task<TResult> Track<TResult>(Func<Task<TResult>> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));
}

Test Main:

internal static class Program
{
    private static async Task Main()
    {
        await using var obj = new TestDisposingObject();
        _ = obj.Job0Async();
        _ = obj.Job1Async(100).ContinueWith(r => Console.WriteLine(r.Result));
        Console.WriteLine("Waiting For jobs done...");
    }
}
0 голосов
/ 17 мая 2019

Утилизация и ожидание завершения - разные вещи.Поэтому я бы предпочел выдать исключение, когда задачи еще выполняются.

Я написал пример с Nito.AsyncEx.AsyncConditionVariable.Я не проверял это, но я думаю, что это должно работать.Просто используйте Completion.WaitAsync().

Также я рекомендую эту статью: https://blog.stephencleary.com/2013/03/async-oop-6-disposal.html

class Gateway : IDisposable {

  private int runningTaskCount;
  public AsyncConditionVariable Completion { get; } = new AsyncConditionVariable( new AsyncLock() );

  public Gateway() {
  }
  public void Dispose() {
    if (runningTaskCount != 0) throw new InvalidOperationException( "You can not call this method when tasks are running" );
  }

  public async Task<Data> Request1 () {
    BeginTask();
    ...
    EndTask();
  }

  private void BeginTask() {
    Interlocked.Increment( ref runningTaskCount );
  }
  private void EndTask() {
    var result = Interlocked.Decrement( ref runningTaskCount );
    if (result == 0) Completion.NotifyAll();
  }

}
0 голосов
/ 15 мая 2019

Проблема здесь в том, что нет асинхронной версии Dispose() (пока).Поэтому вы должны спросить себя - что вы ожидаете от звонка Dispose() или когда блок using закончится ....?Другими словами, каково требование?

Вам может потребоваться Dispose, чтобы дождаться выполнения всех нерешенных задач и затем выполнить свою работу.Но Dispose не может использовать await (это не асинхронно).Лучшее, что он может сделать, это вызвать Result, чтобы заставить задачу завершиться, но это будет блокирующий вызов, и если какая-либо из асинхронных задач ожидает чего-либо еще, она может легко зайти в тупик.

Вместо этогоЯ предлагаю следующее требование: когда вызывающий абонент вызывает Dispose(), вызов помечает шлюз, который должен быть удален, а затем немедленно возвращается, будучи уверенным в том, что механизм удаления активируется после завершения последней задачи.

Если это требование является адекватным, возможно , но немного грязно.Вот как это делается:

  1. Каждый раз, когда вызывается метод (например, Request), «оборачивайте» возвращенную задачу в другую задачу, которая включает проверку, чтобы убедиться, что вызывающая сторона запросилаШлюз, который должен быть удален.

  2. Если запрошено удаление, продолжайте и утилизируйте прямо там и сейчас, прежде чем пометить задачу как выполненную.Таким образом, когда вызывающая сторона ожидает задания, она принудительно утилизирует.

Вот моя реализация.Я сказал вам, что это было некрасиво.

class Gateway : IDisposable 
{
    protected readonly HttpClient _client = new HttpClient();  //an inner class that must be disposed when Gateway disposes
    protected bool _disposalRequested = false;
    protected bool _disposalCompleted = false;
    protected int _tasksRunning = 0;


    public void Dispose()
    {
        Console.WriteLine("Dispose() called.");
        _disposalRequested = true;  
        if (_tasksRunning == 0)
        {
            Console.WriteLine("No running tasks, so disposing immediately.");
            DisposeInternal();
        }
        else
        {
            Console.WriteLine("There are running tasks, so disposal shall be deferred.");
        }
    }

    protected void DisposeInternal()
    {
        if (!_disposalCompleted)
        {
            Console.WriteLine("Disposing");
            _client.Dispose();
            _disposalCompleted = true;
        }
    }

    protected async Task<T> AddDisposeWrapper<T>(Func<Task<T>> func)
    {
        if (_disposalRequested) throw new ObjectDisposedException("Disposal has already been requested. No new requests can be handled at this point.");

        _tasksRunning++;
        var result = await func();
        _tasksRunning--;
        await DisposalCheck();
        return result;
    }

    protected async Task DisposalCheck()
    {
        if (_disposalRequested) DisposeInternal();
    }

    public Task<Data> Request1()
    {
        return AddDisposeWrapper
        (
            Request1Internal
        );
    }

    public Task<Data> Request2()
    {
        return AddDisposeWrapper
        (
            Request2Internal
        );
    }

    protected async Task<Data> Request1Internal()
    {
        Console.WriteLine("Performing Request1 (slow)");
        await Task.Delay(3000);
        Console.WriteLine("Request1 has finished. Returning new Data.");
        return new Data();
    }

    protected async Task<Data> Request2Internal()
    {
        Console.WriteLine("Performing Request2 (fast)");
        await Task.Delay(1);
        Console.WriteLine("Request2 has finished. Returning new Data.");
        return new Data();
    }
}

Вот некоторый тестовый код:

public class Program
{
    public static async Task Test1()
    {
        Task<Data> task;
        using (var gateway = new Gateway())
        {
            task = gateway.Request1();
            await Task.Delay(1000);
        }
        var data = await task;
        Console.WriteLine("Test 1 is complete.");
    }

    public static async Task Test2()
    {
        Task<Data> task;
        using (var gateway = new Gateway())
        {
            task = gateway.Request2();
            await Task.Delay(1000);
        }
        var data = await task;
        Console.WriteLine("Test 2 is complete.");
    }

    public static async Task MainAsync()
    {
        await Test1();
        await Test2();
    }

    public static void Main()
    {
        MainAsync().GetAwaiter().GetResult();
        Console.WriteLine("Run completed at {0:yyyy-MM-dd HH:mm:ss}", DateTime.Now);
    }
}

Это вывод:

Performing Request1 (slow)
Dispose() called.
There are running tasks, so disposal shall be deferred.
Request1 has finished. Returning new Data.
Disposing
Test 1 is complete.
Performing Request2 (fast)
Request2 has finished. Returning new Data.
Dispose() called.
No running tasks, so disposing immediately.
Disposing
Test 2 is complete.
Run completed at 2019-05-15 00:34:46

А вот моя скрипка,в случае, если вы хотите попробовать это: Ссылка

Я не очень рекомендую это (если что-то будет утилизировано, вы должны лучше контролировать его срок службы), но этобыло забавно писать этот код для вас.

Примечание: из-за использования подсчета ссылок потребуется дополнительная работа, чтобы сделать это решение поточно-ориентированным или сделать его устойчивым к случаю, когда один из методов запроса Gatewayвыдает исключение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...