Размер буфера регистрации в Rx - PullRequest
0 голосов
/ 24 июня 2019

У меня есть последовательность выбора на горячем наблюдаемом источнике, и я пытаюсь выяснить, какой из них «замедляет меня» (то есть источник имеет внутренний буфер и говорит мне, что ему нужно отбрасывать предметы). И я также хочу представить возможность выполнения работы «параллельно» (т.е. Transform1 может начать работу на item(t + 1) до завершения Transform2 item(t)).

source.Select(a => Transform1(a))
      .Select(b => Transform2(a))
      .Select(c => Transform3(a))

В настоящее время у меня есть это, которое я не уверен, что запустил в производство, но, по крайней мере, предоставляет информацию во время разработки:

public static IObservable<T> Buffered<T>(this IObservable<T> source, Action<T, int> log, 
                                                                   IScheduler scheduler)
{
    return Observable.Create<T>(ob =>
    {
        int count = 0;
        return source
            .Do(_ => Interlocked.Increment(ref count))
            .Do(t => log(t, count)) //log on fill
            .ObserveOn(scheduler)
            .Do(_ => Interlocked.Decrement(ref count))
            .Do(t => log(t, count)) //log on empty...
            .Subscribe(ob);
    });
}

Итак, мой оригинальный код:

source.Buffered((_, n) => Log($"Transform1 buffer: {n}"))
      .Select(a => Transform1(a))
      .Buffered((_, n) => Log($"Transform2 buffer: {n}"))
      .Select(b => Transform2(a))
      .Buffered((_, n) => Log($"Transform3 buffer: {n}"))
      .Select(c => Transform3(a))

И я вижу, что все резервное копирование перед Transform2 (например).

  • Я пропускаю что-то, что уже существует?
  • Есть ли лучший способ сделать это?
...