Я новичок в RX, и у меня хорошо работает мой желаемый сценарий, но мне кажется, что должен быть более простой или более элегантный способ добиться этого.У меня есть IObservable<T>
, и я хочу подписаться на него таким образом, чтобы я в итоге получил IObservable<U>,
, вызвав асинхронную операцию, которая генерирует U для каждого T, который он видит.
То, что я имею до сих пор (это прекрасно работает, но кажется громоздким), использует промежуточный поток событий и идет примерно так:
public class Converter {
public event EventHandler<UArgs> UDone;
public IConnectableObservable<U> ToUs(IObservable<T> ts) {
var us = Observable.FromEvent<UArgs>(this, "UDone").Select(e => e.EventArgs.U).Replay();
ts.Subscribe(t => Observable.Start(() => OnUDone(new U(t))));
return us;
}
private void OnUDone(U u) {
var uDone = UDone;
if (uDone != null) {
uDone(this, u);
}
}
}
...
var c = new Converter();
IConnectableObservable<T> ts = ...;
var us = c.ToUs(ts);
us.Connect();
...
Я уверен, что мне очень не хватаетболее простой способ сделать это ...