Две функции работают вместе. PS: Забудьте о async streams
, подумайте о await foreach
.
Асинхронные потоки
Асинхронные потоки - это (относительно) функция низкого уровня, которая допускает асинхронностьитерация . Сам по себе он не предлагает никаких других возможностей, таких как фильтрация, агрегация и т. Д. Он основан на получении, в то время как Rx основан на push.
Вы можете использовать операторы LINQ в асинхронном потоке через System.Linq. Библиотека Async найдена в ..... репозитории ReacticeX.NET Github . Это быстро, но не предлагает функциональность обработки событий Rx.
Нет, например, понятия времени, тем более способа использования пользовательского планировщика. Нет подписок, нет ошибок. GroupBy будет потреблять весь исходный элемент и излучать элементы группы как отдельные экземпляры IAsyncEnumerable
, в то время как GroupBy Rx будет излучать отдельные Observables для каждой группы.
В примере вопроса IAsyncEnumerable является естественным соответствием, поскольку здесь не задействована логика событий, а просто выполняется итерация по асинхронному итератору.
Если в примере была предпринята попытка опроса, например, удаленной службы, и обнаружения всплесков отказов (т. Е. Больше отказов за интервал, чем пороговое значение), то IAsyncEnumerable был бы неуместным, поскольку он блокировал бы ожидание всех ответов. Фактически, мы вообще не могли агрегировать события за раз.
Threading
Нет, на самом деле - вызов IAsyncEnumerable или await foreach
не указывает, как событияпроизводится или потребляется. Если мы хотим использовать отдельное задание для обработки элемента, мы должны создать его самостоятельно, например:
public async Task Run(string path)
{
await foreach (var line in LoadStockTrades())
{
var result = await Task.Run(()=>AnalyzeTrade(line));
Console.WriteLine($"{result} : {line});
}
}
Reactive Extensions
Reactive Extensions является высокимбиблиотека уровней, которая работает с потоками событий. Он основан на push, он понимает время, но он также медленнее, чем низкоуровневые конструкции, такие как Async Streams или Channels.
В примере вопроса Rx был бы излишним. Опрос и обнаружение пиков, хотя и прост, с несколькими вариантами управления окнами.
System.Linq.Async может создать Observable из IAsyncEnumerable с ToObservable , что означает, что IAsyncEnumerable можно использовать в качестве источникадля Rx.
Threading
По умолчанию Rx является однопоточным, что имеет смысл для его основного сценария - обработки потока событий.
С другой стороны, Rx позволяет издателю, подписчику и операторам работать в одинаковых или отдельных потоках. На языках, которые не имеют async/await
или DataFlow (например, Java, JavaScript), Rx используется для эмуляции параллельных конвейеров обработки при запуске издателя и подписчиков в разных потоках.