Как перезапустить асинхронный метод? Отмените предыдущий запуск, дождитесь его и запустите - PullRequest
0 голосов
/ 08 января 2019

У меня есть метод RestartAsync, который запускает метод DoSomethingAsync. Когда RestartAsync вызывается снова, он должен отменить DoSomethingAsync и ждать, пока он не будет завершен (DoSomethingAsync НЕ может быть отменен синхронно, и его НЕ следует вызывать, когда предыдущая задача еще выполняется).

Мой первый подход выглядел так:

public async Task RestartTest()
{
    Task[] allTasks = { RestartAsync(), RestartAsync(), RestartAsync() } ;
    await Task.WhenAll(allTasks);
}

private async Task RestartAsync()
{
    _cts.Cancel();
    _cts = new CancellationTokenSource();
    await _somethingIsRunningTask;

    _somethingIsRunningTask = DoSomethingAsync(_cts.Token);

    await _somethingIsRunningTask;
}

private static int _numberOfStarts;

private async Task DoSomethingAsync(CancellationToken cancellationToken)
{
    _numberOfStarts++;
    int numberOfStarts = _numberOfStarts;

    try
    {
        Console.WriteLine(numberOfStarts + " Start to do something...");
        await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
        await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        Console.WriteLine(numberOfStarts + " Finished to do something...");
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine(numberOfStarts + " Cancelled to do something...");
    }
}

Фактический вывод при трехкратном вызове RestartAsync выглядит следующим образом (обратите внимание, что второй запуск отменяет и ожидает первый, но в то же время третий запуск также ожидает первый вместо отмены и ожидания второго):

1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
3 Start to do something...
2 Finished to do something...
3 Finished to do something...

Но я хочу добиться этого:

1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Cancelled to do something...
3 Start to do something...
3 Finished to do something...

Мое текущее решение следующее:

private async Task RestartAsync()
{
    if (_isRestarting)
    {
        return;
    }

    _cts.Cancel();
    _cts = new CancellationTokenSource();

    _isRestarting = true;
    await _somethingIsRunningTask;
    _isRestarting = false;

    _somethingIsRunningTask = DoSomethingAsync(_cts.Token);

    await _somethingIsRunningTask;
}

Тогда я получаю этот вывод:

1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Finished to do something...

Теперь, по крайней мере, DoSomethingAsync не запускается, пока он еще выполняется (обратите внимание, что третий запуск игнорируется, что не имеет большого значения, потому что в противном случае он должен отменить второй запуск).

Но это решение нехорошо, и мне приходится повторять этот уродливый шаблон везде, где я хочу такого поведения. Есть ли какой-нибудь хороший шаблон или структура для такого рода механизма перезапуска?

Ответы [ 2 ]

0 голосов
/ 09 января 2019

Я думаю, что проблема в методе RestartAsync. Помните, что асинхронный метод немедленно вернет задачу, если он чего-то ожидает, поэтому второй RestartAsync фактически вернется, прежде чем поменять свою задачу, затем третий RestartAsync и ожидает задачу сначала RestartAsync.

Также, если RestartAsync будет выполняться несколькими потоками, вы можете обернуть _cts и _somethingIsRunningTask в один и обменять значения с помощью метода Interlocked.Exchange, чтобы предотвратить состояние гонки.

Вот мой пример кода, не полностью протестированный:

public class Program
{
    static async Task Main(string[] args)
    {
        RestartTaskDemo restartTaskDemo = new RestartTaskDemo();

        Task[] tasks = { restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ) };
        await Task.WhenAll( tasks );

        Console.ReadLine();
    }
}

public class RestartTaskDemo
{
    private int Counter = 0;

    private TaskEntry PreviousTask = new TaskEntry( Task.CompletedTask, new CancellationTokenSource() );

    public async Task RestartAsync( int delay )
    {            
        TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        TaskEntry previousTaskEntry = Interlocked.Exchange( ref PreviousTask, new TaskEntry( taskCompletionSource.Task, cancellationTokenSource ) );

        previousTaskEntry.CancellationTokenSource.Cancel();
        await previousTaskEntry.Task.ContinueWith( Continue );

        async Task Continue( Task previousTask )
        {
            try
            {
                await DoworkAsync( delay, cancellationTokenSource.Token );
                taskCompletionSource.TrySetResult( true );
            }
            catch( TaskCanceledException )
            {
                taskCompletionSource.TrySetCanceled();
            }
        }            
    }

    private async Task DoworkAsync( int delay, CancellationToken cancellationToken )
    {
        int count = Interlocked.Increment( ref Counter );
        Console.WriteLine( $"Task {count} started." );

        try
        {
            await Task.Delay( delay, cancellationToken );
            Console.WriteLine( $"Task {count} finished." );
        }
        catch( TaskCanceledException )
        {
            Console.WriteLine( $"Task {count} cancelled." );
            throw;
        }
    }

    private class TaskEntry
    {
        public Task Task { get; }

        public CancellationTokenSource CancellationTokenSource { get; }

        public TaskEntry( Task task, CancellationTokenSource cancellationTokenSource )
        {
            Task = task;
            CancellationTokenSource = cancellationTokenSource;
        }
    }
}
0 голосов
/ 09 января 2019

Это проблема параллелизма. Итак, вам нужно решение проблем параллелизма: семафор.

В общем случае вы также должны учитывать, когда выполняемый метод выдает OperationCanceledException:

private async Task DoSomethingAsync(CancellationToken cancellationToken)
{
    _numberOfStarts++;
    int numberOfStarts = _numberOfStarts;

    try
    {
        Console.WriteLine(numberOfStarts + " Start to do something...");
        await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
        await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        Console.WriteLine(numberOfStarts + " Finished to do something...");
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine(numberOfStarts + " Cancelled to do something...");
        throw;
    }
}

Попробуйте это:

private SemaphoreSlim semaphore = new SemaphoreSlim(1);
private (CancellationTokenSource cts, Task task)? state;

private async Task RestartAsync()
{
    Task task = null;

    await this.semaphore.WaitAsync();

    try
    {
        if (this.state.HasValue)
        {
            this.state.Value.cts.Cancel();
            this.state.Value.cts.Dispose();

            try
            {
                await this.state.Value.task;
            }
            catch (OperationCanceledException)
            {
            }

            this.state = null;
        }

        var cts = new CancellationTokenSource();
        task = DoSomethingAsync(cts.Token);

        this.state = (cts, task);
    }
    finally
    {
        this.semaphore.Release();
    }

    try
    {
        await task;
    }
    catch (OperationCanceledException)
    {
    }
}
...