Фабрика для IAsyncEnumerable или IAsyncEnumerator - PullRequest
1 голос
/ 01 мая 2020

Мне интересно, есть ли способ создать либо IAsyncEnumerable<T>, либо IAsyncEnumerator<T> через объект-источник, скорее всего, как TaskCompletionSource позволяет выполнять задачи. В частности, TaskCompletionSource может передаваться как любой другой параметр.

Может быть что-то вроде этого:

public class AsyncEnumerables {

    public Task HandlerTask { get; set; }

    public async Task<string> ParentMethod() {
        var source = new AsyncEnumerableSource<int>();
        IAsyncEnumerable asyncEnumerable = source.GetAsyncEnumerable();
        HandlerTask = Task.Run(() => handleAsyncResultsAsTheyHappen(asyncEnumerable));
        int n = await someOtherTask();
        source.YieldReturn(n);
        var r = await ChildMethod(source);
        source.Complete();  // this call would cause the HandlerTask to complete.
        return r;
    }

    private async Task<string> ChildMethod(AsyncEnumerableSource<int> source) {
        source.YieldReturn(5);
        await SomeOtherCall();
        source.YieldReturn(10);
        return "hello";
    }
}

С приведенным выше кодом задача handleAsyncResultsAsTheyHappen будет видеть все полученные значения перешел в YieldReturn. Таким образом, он будет видеть n из приведенного выше кода, а также 5 и 10 из ChildMethod.

Ответы [ 2 ]

1 голос
/ 01 мая 2020

Вам гораздо лучше, если вы сможете структурировать свой код так, чтобы использовать преимущества yield return и await foreach. Например, этот код делает почти то же самое:

public async Task Consume()
{
    var source = ParentMethod();
    HandlerTask = Task.Run(async () => { await foreach (var item in source) { Console.WriteLine(item); } });
}

public async IAsyncEnumerable<int> ParentMethod()
{
    await Task.Yield();
    yield return 13;
    await foreach (var item in ChildMethod())
        yield return item;
}

private async IAsyncEnumerable<int> ChildMethod()
{
    yield return 5;
    await Task.Yield();
    yield return 10;
}

Однако, если вам действительно нужен «асинхронный c перечислимый источник», вам необходимо сначала распознать одну вещь. TaskCompletionSource<T> содержит результаты, то есть T (или исключение). Это действует как контейнер. Результат может быть установлен до того, как задача будет ждать. То же самое относится и к «перечисляемому источнику asyn c» - вам нужно, чтобы он мог хранить результаты до того, как из него будут взяты какие-либо элементы. «Асин c перечислимый источник» должен содержать несколько результатов - в этом случае коллекция .

Итак, что вы на самом деле запрашиваете, так это msgstr "коллекция, которую можно использовать как асинхронный перечислимый". Здесь есть несколько возможностей, но я бы порекомендовал Channel :

public async Task<string> ParentMethod()
{
  var source = Channel.CreateUnbounded<int>();
  var sourceWriter = source.Writer;
  IAsyncEnumerable<int> asyncEnumerable = source.Reader.ReadAllAsync();
  HandlerTask = Task.Run(async () => { await foreach (var item in asyncEnumerable) Console.WriteLine(item); });
  await Task.Yield();
  await sourceWriter.WriteAsync(13);
  var r = await ChildMethod(sourceWriter);
  sourceWriter.Complete();
  return r;
}

private async Task<string> ChildMethod(ChannelWriter<int> sourceWriter)
{
  await sourceWriter.WriteAsync(5);
  await Task.Yield();
  await sourceWriter.WriteAsync(10);
  return "hello";
}
0 голосов
/ 01 мая 2020

AFAIK. Платформа. NET не имеет встроенного класса AsyncEnumerableSource, но ее легко реализовать с помощью System.Reactive и System.Interactive.Asyn * 1028. * библиотек. Библиотека System.Reactive содержит класс Subject, который является комбинацией IObservable и IObserver. Это удобный класс, потому что вы можете отправлять уведомления на интерфейс IObserver и независимо подписываться любое количество раз на интерфейс IObservable для получения этих уведомлений обратно. На самом деле подписка вручную не требуется, поскольку библиотека System.Interactive.Async содержит удобный метод расширения ToAsyncEnumerable, который автоматически преобразует IObservable в IAsyncEnumerable.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;

public class AsyncEnumerableSource<T>
{
    private readonly Subject<T> _subject = new Subject<T>();

    public IAsyncEnumerable<T> GetAsyncEnumerable() => _subject.ToAsyncEnumerable();
    public void YieldReturn(T value) => _subject.OnNext(value);
    public void Complete() => _subject.OnCompleted();
    public void Fault(Exception ex) => _subject.OnError(ex);
}

Эта реализация отправит подписчики только уведомления, которые произошли после их подписки. Если вы хотите, чтобы поздние участники получили ранние сообщения, вы можете заменить Subject на ReplaySubject. Он буферизует получаемые уведомления, поэтому идет с соображениями использования памяти: он принимает аргумент int bufferSize в своем конструкторе.

...