Как заставить IAsyncEnumerable уважать CancellationToken - PullRequest
3 голосов
/ 04 октября 2019

Редактировать: Требования этого вопроса изменились. См. Ниже раздел Обновление .

У меня есть метод асинхронного итератора, который создает IAsyncEnumerable<int> (поток чисел), одно число каждые 200 мсек. Вызывающая сторона этого метода потребляет поток, но хочет остановить перечисление через 1000 мс. Таким образом, используется CancellationTokenSource, и токен передается в качестве аргумента в метод расширения WithCancellation. Но знак не соблюдается. Перечисление продолжается до тех пор, пока не будут использованы все числа:

static async IAsyncEnumerable<int> GetSequence()
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(200);
        yield return i;
    }
}

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

Вывод:

12: 55: 17.506> 1
12: 55: 17.739> 2
12: 55: 17.941> 3
12: 55: 18.155> 4
12: 55: 18.367> 5
12: 55: 18.570> 6
12: 55: 18.772> 7
12: 55: 18.973> 8
12: 55: 19.174> 9
12: 55: 19.376> 10

Ожидаемый результат - TaskCanceledException после числа 5. Кажется, я неправильно понял, что на самом деле делает WithCancellation. Метод просто передает предоставленный токен методу итератора, если этот метод принимает его. В противном случае, как и в методе GetSequence() в моем примере, токен игнорируется. Я полагаю, что решение в моем случае состоит в том, чтобы вручную опросить токен внутри тела перечисления:

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

Это просто и хорошо работает. Но в любом случае мне интересно, можно ли было бы создать метод расширения, который будет делать то, что я ожидал от WithCancellation, чтобы запекать токен внутри следующего перечисления. Это подпись необходимого метода:

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    // Is it possible?
}

Обновление : Кажется, что когда я задавал этот вопрос, у меня было неправильное понимание цели всей концепции отмены,У меня сложилось впечатление, что отмена предназначена для разрыва цикла после ожидания из MoveNextAsync, тогда как настоящая цель - отменить самого ожидающего . В моем тривиальном примере ожидание длится всего 200 мсек, но в реальном мире ожидание может быть намного длиннее, даже бесконечным. После осознания этого мой вопрос в его нынешней форме почти не имеет значения, и я должен либо удалить его и открыть новый с тем же названием, либо изменить требования существующего вопроса. Оба варианта так или иначе плохи.

Я решил пойти со вторым вариантом. Поэтому я не принимаю принятый в настоящее время ответ и прошу найти новое решение для более сложной проблемы обеспечения отмены таким образом, чтобы это имело немедленный эффект. Другими словами, отмена токена должна привести к завершению асинхронного перечисления за считанные миллисекунды. Приведем практический пример, позволяющий различить желаемое и нежелательное поведение:

var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
    await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
    {
        Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}

Вывод (желательно):

0: 00.242> 1
0: 00.467> 2
0: 00.500> Отменено

Вывод (нежелательный):

0: 00.242> 1
0: 00.467> 2
0: 00.707>Отменено

GetSequence - это тот же метод, что и в первоначальном примере, который транслирует одно число каждые 200 мсек. Этот метод не поддерживает отмену, и предпосылка в том, что мы не можем это изменить. WithEnforcedCancellation - это необходимый метод расширения, который должен решить эту проблему.

Ответы [ 2 ]

4 голосов
/ 04 октября 2019

IAsyncEnumerable явно предусматривает этот механизм с атрибутом EnumeratorCancellation:

static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
    for (int i = 1; i <= 10; i++) {
        ct.ThrowIfCancellationRequested();
        await Task.Delay(200);    // or `Task.Delay(200, ct)` if this wasn't an example
        yield return i;
    }
}

Фактически, компилятор достаточно полезен, чтобы выдать предупреждение, если вы передаете методу параметр CancellationToken,но не добавляйте атрибут.

Обратите внимание, что токен, переданный .WithCancellation, переопределит любой локальный токен, переданный методу. В спецификациях есть подробности об этом.

Конечно, это все равно будет работать, только если перечисление действительно принимает CancellationToken - но тот факт, что отмена действительно работает, только если выполняется совместноверно для любой async работы. Ответ Йелдара хорош для "принудительного" преобразования некоторой меры отмены в перечислимое, которое его не поддерживает, но предпочтительным решением должно быть изменение перечисления для поддержки отмены самостоятельно - компилятор делает все, чтобывыручить вас.

2 голосов
/ 04 октября 2019

Вы можете просто извлечь свою логику в метод расширения, подобный этому:

public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));

    cancellationToken.ThrowIfCancellationRequested();

    await foreach (var item in source)
    {
        cancellationToken.ThrowIfCancellationRequested();
        yield return item;
    }
}
...