Я думаю, у меня есть хорошее чистое решение для вас.
Сначала вернитесь к использованию Func<int, int>
- его легко превратить в Func<int, IObservable<int>>
с помощью Observable.FromAsyncPattern
.
Я использовал это для тестирования:
Func<int, int> mkInt = ts =>
{
Thread.Sleep(100);
return ts + 1;
};
Теперь вот деньги:
Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) =>
Observable.Create<int>(o =>
{
var ofn = Observable
.FromAsyncPattern<int, int>(
fn.BeginInvoke,
fn.EndInvoke);
var s = new Subject<int>();
var q = s.Select(x => ofn(x)).Switch();
var r = new CompositeDisposable(new IDisposable[]
{
q.Subscribe(s),
s.Subscribe(o),
});
s.OnNext(i0);
return r;
});
Итеративная функция превращается в асинхронную наблюдаемую.
Переменная q
подает значения от субъекта в наблюдаемую итерационную функцию и выбирает вычисленную наблюдаемую. Метод Switch
выравнивает результат и обеспечивает правильную очистку каждого вызова наблюдаемой итерационной функции.
Кроме того, использование CompositeDisposable
позволяет использовать две подписки как одну. Очень аккуратно!
Его легко использовать так:
using (mkInts(7, mkInt).Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
Теперь у вас есть полностью параметризованная версия функции генератора. Хорошо, а?