Как связать асинхронные задачи с CancellationToken? - PullRequest
2 голосов
/ 18 октября 2019

Я хотел бы объединить некоторые задачи, но условно продолжить выполнение, если CancellationToken меня не уволили.

То, чего я стремлюсь достичь, эквивалентно

var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var t = Task.Run(async () => {
    if (cancellationToken.IsCancellationRequested) return;
    await t1();
    if (cancellationToken.IsCancellationRequested) return;
    await t2();
    if (cancellationToken.IsCancellationRequested) return;
    await t3();
    if (cancellationToken.IsCancellationRequested) return;
    await t4();
});
var timeout = Task.Delay(TimeSpan.FromSeconds(4));
var completedTask = await Task.WhenAny(t, timeout);
if (completedTask != t)
{
    cts.Cancel();
    await t;
}

Это то, что у меня уже есть, и оно работает, хотя и многословно.

Ответы [ 4 ]

1 голос
/ 19 октября 2019
var cts = new CancellationTokenSource();
var t = Task.Run(async () => {
     await t1();
     await t2();
     await t3();
     await t4();
 }, cts.Token);

cts.CancelAfter(TimeSpan.FromSeconds(4));

try
{
     await t;
}
catch (OperationCanceledException)
{
     // The cancellation token was triggered, add logic for that
     ....
}
1 голос
/ 18 октября 2019

Похоже, ваша цель - просто остановить выполнение, если вся операция заняла более 4 секунд.

Если вы передавали CancellationToken на t1 / t2 / и т. Д. методы, я бы сказал, вы не могли бы сделать лучше, чем у вас есть. Но если у вас есть, вы можете просто использовать Stopwatch вместо CancellationToken:

var timeout = TimeSpan.FromSeconds(4);
var stopwatch = new Stopwatch();
stopwatch.Start();

await t1();
if (stopwatch.Elapsed > timeout) return;
await t2();
if (stopwatch.Elapsed > timeout) return;
await t3();
if (stopwatch.Elapsed > timeout) return;
await t4();
stopwatch.Stop();

Я предполагаю, что это где-то в методе, где вы можете использовать return, но вы можете изменить при необходимости (вернуть значение, сгенерировать исключение и т. Д.).

0 голосов
/ 20 октября 2019

Вам следует рассмотреть возможность использования Microsoft Reactive Framework (также известной как Rx) - NuGet System.Reactive и добавить using System.Reactive.Linq; - тогда вы можете сделать это:

IObservable<Unit> tasks =
    from x1 in Observable.FromAsync(() => t1())
    from x2 in Observable.FromAsync(() => t2())
    from x3 in Observable.FromAsync(() => t3())
    from x4 in Observable.FromAsync(() => t4())
    select Unit.Default;

IObservable<Unit> timer =
    Observable
        .Timer(TimeSpan.FromSeconds(4.0))
        .Select(x => Unit.Default)

IDisposable subscription =
    Observable
        .Amb(tasks, timer)
        .Subscribe();

Если наблюдаемый таймер срабатывает до завершения задачтогда весь трубопровод отменяется. Никакие ненужные задачи не выполняются.

Если вы хотите отменить вручную, просто наберите subscription.Dispose().

Код также прост и красив.

0 голосов
/ 19 октября 2019

С вашим кодом все в порядке, но когда вы читаете его с первого взгляда, не ясно, что он делает. Поэтому я предлагаю вам инкапсулировать эту логику в служебный метод с описательным именем и параметрами:

public static async Task RunSequentially(IEnumerable<Func<Task>> taskFactories,
    int timeout = Timeout.Infinite, bool onTimeoutAwaitIncompleteTask = false)
{
    using (var cts = new CancellationTokenSource(timeout))
    {
        if (onTimeoutAwaitIncompleteTask)
        {
            await Task.Run(async () =>
            {
                foreach (var taskFactory in taskFactories)
                {
                    if (cts.IsCancellationRequested) throw new TimeoutException();
                    await taskFactory();
                }
            });
        }
        else // On timeout return immediately
        {
            var allSequentially = Task.Run(async () =>
            {
                foreach (var taskFactory in taskFactories)
                {
                    cts.Token.ThrowIfCancellationRequested();
                    var task = taskFactory(); // Synchronous part of task
                    cts.Token.ThrowIfCancellationRequested();
                    await task; // Asynchronous part of task
                }
            }, cts.Token);
            var timeoutTask = new Task(() => {}, cts.Token);
            var completedTask = await Task.WhenAny(allSequentially, timeoutTask);
            if (completedTask.IsCanceled) throw new TimeoutException();
            await completedTask; // Propagate any exception
        }
    }
}

Этот код отличается от вашего в том, что он выдает TimeoutException по истечении времени ожидания. Я думаю, что лучше заставить вызывающую сторону явно обработать это исключение, а не скрывать тот факт, что время ожидания операции истекло. Вызывающая сторона может игнорировать исключение, оставив пустой блок catch:

try
{
    await RunSequentially(new[] { t1, t2, t3, t4 },
        timeout: 4000,
        onTimeoutAwaitIncompleteTask: true);
}
catch (TimeoutException)
{
    // Do nothing
}
...