Как асинхронные потоки сравниваются с реактивным расширением? - PullRequest
2 голосов
/ 20 октября 2019

Как сравнить следующие два? Является ли Rx более мощным?

Реактивное расширение:

var observable = Observable.Create<char>(async (observer, cancel) =>
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        observer.OnNext(line);
    }
});

observable.Subscribe(
    c => Console.WriteLine(c.ToString()),
    () => end.Dispose());

Асинхронные потоки:

public async void Run(string path)
{
    await foreach (var line in TestAsync())
    {
        Console.WriteLine(line);
    }
}

private async IAsyncEnumerable<string> TestAsync()
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        yield return line;
    }
}

1 Ответ

2 голосов
/ 22 октября 2019

Две функции работают вместе. PS: Забудьте о async streams, подумайте о await foreach.

Асинхронные потоки

Асинхронные потоки - это (относительно) функция низкого уровня, которая допускает асинхронностьитерация . Сам по себе он не предлагает никаких других возможностей, таких как фильтрация, агрегация и т. Д. Он основан на получении, в то время как Rx основан на push.

Вы можете использовать операторы LINQ в асинхронном потоке через System.Linq. Библиотека Async найдена в ..... репозитории ReacticeX.NET Github . Это быстро, но не предлагает функциональность обработки событий Rx.

Нет, например, понятия времени, тем более способа использования пользовательского планировщика. Нет подписок, нет ошибок. GroupBy будет потреблять весь исходный элемент и излучать элементы группы как отдельные экземпляры IAsyncEnumerable, в то время как GroupBy Rx будет излучать отдельные Observables для каждой группы.

В примере вопроса IAsyncEnumerable является естественным соответствием, поскольку здесь не задействована логика событий, а просто выполняется итерация по асинхронному итератору.

Если в примере была предпринята попытка опроса, например, удаленной службы, и обнаружения всплесков отказов (т. Е. Больше отказов за интервал, чем пороговое значение), то IAsyncEnumerable был бы неуместным, поскольку он блокировал бы ожидание всех ответов. Фактически, мы вообще не могли агрегировать события за раз.

Threading

Нет, на самом деле - вызов IAsyncEnumerable или await foreach не указывает, как событияпроизводится или потребляется. Если мы хотим использовать отдельное задание для обработки элемента, мы должны создать его самостоятельно, например:

public async Task Run(string path)
{
    await foreach (var line in LoadStockTrades())
    {
        var result = await Task.Run(()=>AnalyzeTrade(line));
        Console.WriteLine($"{result} : {line});
    }
}

Reactive Extensions

Reactive Extensions является высокимбиблиотека уровней, которая работает с потоками событий. Он основан на push, он понимает время, но он также медленнее, чем низкоуровневые конструкции, такие как Async Streams или Channels.

В примере вопроса Rx был бы излишним. Опрос и обнаружение пиков, хотя и прост, с несколькими вариантами управления окнами.

System.Linq.Async может создать Observable из IAsyncEnumerable с ToObservable , что означает, что IAsyncEnumerable можно использовать в качестве источникадля Rx.

Threading

По умолчанию Rx является однопоточным, что имеет смысл для его основного сценария - обработки потока событий.

С другой стороны, Rx позволяет издателю, подписчику и операторам работать в одинаковых или отдельных потоках. На языках, которые не имеют async/await или DataFlow (например, Java, JavaScript), Rx используется для эмуляции параллельных конвейеров обработки при запуске издателя и подписчиков в разных потоках.

...