У меня есть последовательность выбора на горячем наблюдаемом источнике, и я пытаюсь выяснить, какой из них «замедляет меня» (то есть источник имеет внутренний буфер и говорит мне, что ему нужно отбрасывать предметы). И я также хочу представить возможность выполнения работы «параллельно» (т.е. Transform1 может начать работу на item(t + 1)
до завершения Transform2 item(t)
).
source.Select(a => Transform1(a))
.Select(b => Transform2(a))
.Select(c => Transform3(a))
В настоящее время у меня есть это, которое я не уверен, что запустил в производство, но, по крайней мере, предоставляет информацию во время разработки:
public static IObservable<T> Buffered<T>(this IObservable<T> source, Action<T, int> log,
IScheduler scheduler)
{
return Observable.Create<T>(ob =>
{
int count = 0;
return source
.Do(_ => Interlocked.Increment(ref count))
.Do(t => log(t, count)) //log on fill
.ObserveOn(scheduler)
.Do(_ => Interlocked.Decrement(ref count))
.Do(t => log(t, count)) //log on empty...
.Subscribe(ob);
});
}
Итак, мой оригинальный код:
source.Buffered((_, n) => Log($"Transform1 buffer: {n}"))
.Select(a => Transform1(a))
.Buffered((_, n) => Log($"Transform2 buffer: {n}"))
.Select(b => Transform2(a))
.Buffered((_, n) => Log($"Transform3 buffer: {n}"))
.Select(c => Transform3(a))
И я вижу, что все резервное копирование перед Transform2 (например).
- Я пропускаю что-то, что уже существует?
- Есть ли лучший способ сделать это?