Как остановить распространение асинхронного потока (IAsyncEnumerable) - PullRequest
0 голосов
/ 03 октября 2019

У меня есть метод, который принимает IAsyncEnumerable в качестве аргумента и возвращает также IAsyncEnumerable. Он вызывает веб-метод для каждого элемента во входном потоке и распространяет результат в выходной поток. У меня вопрос, как я могу получить уведомление, если вызывающая сторона моего метода перестала перечислять поток вывода, чтобы я мог прекратить перечислять поток ввода внутри моего метода? Кажется, что я должен быть в состоянии получить уведомление, потому что вызывающий по умолчанию утилизирует IAsyncEnumerator, который получает мой метод. Есть ли встроенный механизм, который генерирует такое уведомление для сгенерированных компилятором асинхронных методов? Если нет, то как проще всего реализовать альтернативу?

Пример. Веб-метод проверяет, является ли URL действительным или нет. Предоставляется бесконечный поток URL-адресов, но вызывающая сторона прекращает перечислять результаты, когда найдено более 2 недействительных URL-адресов:

var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
    Console.WriteLine($"Url {result.Url} is "
        + (result.IsValid ? "OK" : "Invalid!"));
    if (!result.IsValid) invalidCount++;
    if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);

Генератор URL-адресов. Один URL генерируется каждые 300 мсек.

private static async IAsyncEnumerable<string> GetMockUrls()
{
    int index = 0;
    while (true)
    {
        await Task.Delay(300);
        yield return $"https://mock.com/{++index:0000}";
    }
}

Валидатор URL. Существует требование, чтобы входной поток перечислялся с нетерпением, поэтому два асинхронных рабочих процесса выполняются параллельно. Первый рабочий процесс вставляет URL-адреса в очередь, а второй рабочий процесс выбирает URL-адреса один за другим и проверяет их. A BufferBlock используется в качестве асинхронной очереди.

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
    {
        yield return (url, await MockValidateUrl(url));
    }
}

Уточнение: очередь является обязательной, и удаление ее не является вариантом. Это важный компонент этой проблемы.

Валидатор одного URL. Процесс проверки длится в среднем 300 мсек.

private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
    await Task.Delay(_random.Next(100, 600));
    return _random.Next(0, 2) != 0;
}

Вывод:

Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...

Проблема заключается в том, что URL-адреса все еще генерируются и принимаются после того, как вызывающий / клиент завершил асинхронное перечисление. Я хотел бы исправить это, чтобы после консоли --Async enumeration finished--.

больше не появлялось сообщений в консоли.

Ответы [ 2 ]

2 голосов
/ 03 октября 2019

Редактировать

Обсуждение будет легче с соответствующим примером. Проверка URL-адресов не так уж дорога. Что если вам нужно набрать, например, 100 URL-адресов и выбрать первые 3 ответа?

В этом случае и рабочий, и буфер имеют смысл.

Редактировать 2

Один из комментариев добавляет дополнительную сложность - задачи выполняются одновременно и результаты должны отправляться по мере их поступления.


Для начала ValidateUrl можно переписать как метод итератора:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    await foreach (var url in urls)
    {
        Console.WriteLine($"Url {url} received");
        var isValid=await MockValidateUrl(url);
        yield return (url, isValid);
    }
}

Нет необходимости в рабочей задаче, поскольку все методы асинхронны. Метод итератора не будет выполняться, пока потребитель не запросит результата. Даже если MockValidateUrl делает что-то дорогое, он может использовать Task.Run сам или быть завернутым в Task.Run. Тем не менее, это вызовет немало задач.

Для полноты картины вы можете добавить CancellationToken и ConfigureAwait(false):

public static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
       IAsyncEnumerable<string> urls, 
       [EnumeratorCancellation]CancellationToken token=default)
{
    await foreach(var url in urls.WithCancellation(token).ConfigureAwait(false))
    {
        var isValid=await MockValidateUrl(url).ConfigureAwait(false);
        yield return (url,isValid);
    }
}

В любом случае, как только вызывающая сторона остановитсяитерация, ValidateUrls остановится.

Буферизация

Буферизация является проблемой - независимо от того, как она запрограммирована, рабочий не остановится, пока буфер не заполнится. Размер буфера - это количество итераций, которые рабочий должен выполнить, прежде чем он поймет, что ему нужно остановиться. Это отличный пример для канала (да, опять!):

public static IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
        IAsyncEnumerable<string> urls,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(2);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.WithCancellation(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader.ReadAllAsync(token);
}

Хотя лучше обойти ChannelReaders, а не IAsyncEnumerables. По крайней мере, асинхронный перечислитель не создается, пока кто-то не попытается прочитать из ChannelReader. Кроме того, проще создавать конвейеры в качестве методов расширения:

public static ChannelReader<(string Url, bool IsValid)> ValidateUrls(
        this ChannelReader<string> urls,int capacity,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(capacity);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.ReadAllAsync(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader;
}

Этот синтаксис позволяет быстро создавать конвейеры. Допустим, у нас есть этот вспомогательный метод для преобразования IEnumerables в channesl (или IAsyncEnumerables):

public static ChannelReader<T> AsChannel(
         IEnumerable<T> items)
{
    var channel=Channel.CreateUnbounded();        
    var writer=channel.Writer;
    foreach(var item in items)
    {
        channel.TryWrite(item);
    }
    return channel.Reader;
}

Мы можем написать:

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls();

await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
   //Use the items here
}

Параллельные вызовы с немедленным распространением

С каналами это просто, хотя в это время работнику нужно запускать все задачи одновременно. По сути, нам нужно несколько работников. Это не то, что можно сделать с помощью только IAsyncEnumerable.

Прежде всего, если мы хотим использовать, например, 5 одновременных задач для обработки входных данных, мы могли бы написать

    var tasks = Enumerable.Range(0,5).
                  .Select(_ => Task.Run(async ()=>{
                                 /// 
                             },token));
    _ = Task.WhenAll(tasks)(t=>writer.Complete(t.Exception));        

вместо:

    _ = Task.Run(async ()=>{
        /// 
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        

Использование большого количества работников может быть достаточно. Я не уверен, что IAsyncEnumerable может использоваться несколькими работниками, и я не очень хочу это выяснить.

Преждевременное аннулирование

Все вышеперечисленные работыесли клиент потребляет все результаты. Чтобы остановить обработку, например, после первых 5 результатов, нам нужен CancellationToken:

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls(cts.Token);

int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
    //Break after 3 iterations
    if(i++>2)
    {
        break;
    }
    ....
}

cts.Cancel();

Этот код сам по себе может быть извлечен в методе, который получает ChannelReader и, в данном случае, CancellationTokenSource:

static async LastStep(this ChannelReader<(string Url, bool IsValid)> input,CancellationTokenSource cts)
    {
    int i=0;
    await foreach(var (url,isValid) in pipeline.ReadAllAsync())
    {
        //Break after 3 iterations
        if(i++>2)
        {
            break;
        }
        ....
    }

    cts.Cancel();        
}

И конвейер становится:

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     
                    .ValidateUrls(cts.Token)
                    .LastStep(cts);
0 голосов
/ 04 октября 2019

Полагаю, мне следует ответить на свой вопрос, поскольку теперь у меня есть достаточно простое общее решение.

Обновление: Я отказываюсь от своего предыдущего ответа, поскольку обнаружил гораздо более простое решение. Это смущающе просто на самом деле. Все, что мне нужно сделать, это заключить выходную часть итератора ValidateUrls в блок try-finally. Блок finally будет выполняться в каждом случае либо вызывающим абонентом, который обычно завершает перечисление, либо ненормально break или исключением. Так вот как я могу получить искомое уведомление, отменив CancellationTokenSource на finally:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    var completionCTS = new CancellationTokenSource();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            if (completionCTS.IsCancellationRequested) break;
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    try
    {
        while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
        {
            yield return (url, await MockValidateUrl(url));
        }
    }
    finally // This runs when the caller completes the enumeration
    {
        completionCTS.Cancel();
    }
}

Я, вероятно, должен отметить, что асинхронный итератор, который не поддерживает отмену, нехорошая практика. Без этого у вызывающего абонента нет простого способа остановить ожидание между потреблением одного значения и следующего. Таким образом, лучшая подпись для моего метода должна быть:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{

. Затем токен можно было бы передать ожидаемым методам цикла выдачи, OutputAvailableAsync и MockValidateUrl.

С точки зрения вызывающего абонента токен может быть передан либо напрямую, либо с помощью цепочки метода расширения WithCancellation.

await foreach (var result in ValidateUrls(GetMockUrls()).WithCancellation(token))
...