Просто для полноты здесь приведена альтернативная (более компактная) версия метода Drain (), предложенная Ричардом:
public static IObservable<T2> SelectManySequential<T1, T2>(
this IObservable<T1> source,
Func<T1, IObservable<T2>> selector
)
{
return source
.Select(x => Observable.Defer<T2>(() => selector(x)))
.Concat();
}
Смотрите тему Drain + SelectMany =? в форуме Rx.
Обновление:
Я понял, что перегрузка Concat (), которую я использовал, была одним из моих личных расширений Rx, которые (еще) являются частью фреймворка. Я прошу прощения за эту ошибку .. Конечно, это делает мое решение менее элегантным, чем я думал.
Тем не менее, для полноты я публикую здесь свою перегрузку метода расширения Conact ():
public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
return Observable.CreateWithDisposable<T>(o =>
{
var lockCookie = new Object();
bool completed = false;
bool subscribed = false;
var waiting = new Queue<IObservable<T>>();
var pendingSubscription = new MutableDisposable();
Action<Exception> errorHandler = e =>
{
o.OnError(e);
pendingSubscription.Dispose();
};
Func<IObservable<T>, IDisposable> subscribe = null;
subscribe = (ob) =>
{
subscribed = true;
return ob.Subscribe(
o.OnNext,
errorHandler,
() =>
{
lock (lockCookie)
{
if (waiting.Count > 0)
pendingSubscription.Disposable = subscribe(waiting.Dequeue());
else if (completed)
o.OnCompleted();
else
subscribed = false;
}
}
);
};
return new CompositeDisposable(pendingSubscription,
source.Subscribe(
n =>
{
lock (lockCookie)
{
if (!subscribed)
pendingSubscription.Disposable = subscribe(n);
else
waiting.Enqueue(n);
}
},
errorHandler
, () =>
{
lock (lockCookie)
{
completed = true;
if (!subscribed)
o.OnCompleted();
}
}
)
);
});
}
А теперь бей себя собственным оружием:
Тот же самый метод Concat () мог бы быть написан гораздо более элегантно блестящим способом Ричарда Сзалая:
public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v =>
v.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
Так что кредит принадлежит Ричарду. : -)