Я бы сказал, что TPL Dataflow охватывает специализированное подмножество функций в Rx. Поток данных предназначен для обработки данных, которая может занимать измеримое количество времени, тогда как Rx - для событий, таких как положение мыши, состояния ошибок и т. Д., Где время обработки незначительно.
Пример: ваш обработчик «подписки» асинхронный, и вы хотите, чтобы в каждый момент времени было не более одного исполнителя. С Rx вы должны блокировать, другого пути нет, потому что Rx асинхронно-независимый и во многих местах не угрожает асинхронности особым образом.
.Subscribe(myAsyncHandler().Result)
Если вы не блокируете, то Rx будет считать, что действие завершено, пока обработчик все еще выполняется асинхронно.
Вы можете подумать, что если вы делаете
.ObserveOn(Scheduler.EventLoopSchedule)
чем проблема решена. Но это нарушит ваш рабочий процесс .Complete (), потому что Rx будет думать, что это сделано, как только запланирует выполнение, и вы выйдете из приложения, не ожидая завершения асинхронной операции.
Если вы хотите разрешить не более 4 одновременных асинхронных задач, чем Rx не предлагает ничего из коробки. Возможно, вы можете что-то взломать, внедрив собственный планировщик, буфер и т. Д.
TPL Dataflow предлагает очень хорошее решение в ActionBlock. Он может ограничивать одновременные действия до определенного числа, и он понимает асинхронные операции, поэтому вызов Complete () и ожидание Completed сделают именно то, что вы ожидаете: ожидание завершения всех выполняющихся асинхронных задач.
Еще одна особенность TPL - это «противодавление». Допустим, вы обнаружили ошибку в своей процедуре обработки и вам необходимо пересчитать данные за последний месяц. Если вы подписываетесь на свой источник, используя Rx, и ваш конвейер содержит неограниченные буферы или ObserveOn, то вам не хватит памяти в считанные секунды, потому что источник будет продолжать читать быстрее, чем может обработать обработка. Даже если вы реализуете блокировку потребителя, ваш источник может страдать от блокировки вызовов, например, если источник асинхронный. В TPL вы можете реализовать источник как
while(...)
await actionBlock.SendAsync(msg)
, который еще не блокирует источник, будет ожидать, пока обработчик перегружен.
В целом я обнаружил, что Rx хорошо подходит для действий, которые являются временными и вычислительными. Если время обработки становится значительным, вы попадаете в мир странных побочных эффектов и эзотерической отладки.
Хорошая новость заключается в том, что блоки потока данных TPL прекрасно работают с Rx. Они имеют адаптеры AsObserver / AsObservable, и вы можете при необходимости прикрепить их к середине конвейера Rx. Но у Rx гораздо больше шаблонов и вариантов использования. Поэтому мое практическое правило - начинать с Rx и добавлять поток данных TPL по мере необходимости.