Есть ли более простой способ сделать IObservable асинхронно зависимым от другого IObservable? - PullRequest
0 голосов
/ 08 февраля 2011

Я новичок в RX, и у меня хорошо работает мой желаемый сценарий, но мне кажется, что должен быть более простой или более элегантный способ добиться этого.У меня есть IObservable<T>, и я хочу подписаться на него таким образом, чтобы я в итоге получил IObservable<U>,, вызвав асинхронную операцию, которая генерирует U для каждого T, который он видит.

То, что я имею до сих пор (это прекрасно работает, но кажется громоздким), использует промежуточный поток событий и идет примерно так:

public class Converter {
  public event EventHandler<UArgs> UDone;
  public IConnectableObservable<U> ToUs(IObservable<T> ts) {
    var us = Observable.FromEvent<UArgs>(this, "UDone").Select(e => e.EventArgs.U).Replay();
    ts.Subscribe(t => Observable.Start(() => OnUDone(new U(t))));
    return us;
  }
  private void OnUDone(U u) {
    var uDone = UDone;
    if (uDone != null) {
      uDone(this, u);
    }
  }
}

...

var c = new Converter();
IConnectableObservable<T> ts = ...;
var us = c.ToUs(ts);
us.Connect();

...

Я уверен, что мне очень не хватаетболее простой способ сделать это ...

Ответы [ 2 ]

1 голос
/ 08 февраля 2011

SelectMany должен делать то, что вам нужно, чтобы выровнять IO<IO<T>>

Observable.Range(1, 10)
        .Select(ii => Observable.Start(() => 
             string.Format("{0} {1}", ii, Thread.CurrentThread.ManagedThreadId)))
        .SelectMany(id=>id)
        .Subscribe(Console.WriteLine);
0 голосов
/ 09 февраля 2011

Это именно то, для чего SelectMany предназначен:

IObservable<int> ts

IObservable<string> us = ts.SelectMany(t => StartAsync(t));

us.Subscribe(u => 
    Console.WriteLine("StartAsync completed with {0}", u));

...

private IObservable<string> StartAsync(int t)
{
    return Observable.Return(t.ToString())
        .Delay(TimeSpan.FromSeconds(1));
}

Имейте в виду, что если StartAsync имеет переменное время завершения, вы можете получить выходные значения в порядке, отличном от входных значений.

...