Короткий ответ: Rx Framework не поддерживает генерацию наблюдаемых объектов с использованием рекурсивного шаблона, подобного этому, поэтому это нелегко сделать. Операция Combine
, используемая для последовательностей F #, нуждается в особой обработке, которую не обеспечивают наблюдаемые. Rx Framework, вероятно, ожидает, что вы сгенерируете наблюдаемые с помощью Observable.Generate
, а затем будете использовать компилятор запросов LINQ / F # для их обработки.
Во всяком случае, вот некоторые мысли -
Прежде всего, вам необходимо заменить Observable.merge
на Observable.Concat
. Первый запускает обе наблюдаемые параллельно, а второй сначала выдает все значения из первой наблюдаемой, а затем выдает значения из второй наблюдаемой. После этого изменения фрагмент будет печатать как минимум ~ 800 чисел до переполнения стека.
Причина переполнения стека состоит в том, что Concat
создает наблюдаемую область, которая вызывает Concat
, чтобы создать другую наблюдаемую область, которая вызывает Concat
и т. Д. Одним из способов решения этой проблемы является добавление некоторой синхронизации. Если вы используете Windows Forms, вы можете изменить Delay
так, чтобы он планировал наблюдаемое в потоке GUI (который отбрасывает текущий стек). Вот эскиз:
type RxBuilder() =
member this.Delay f =
let sync = System.Threading.SynchronizationContext.Current
let res = Observable.Defer f
{ new IObservable<_> with
member x.Subscribe(a) =
sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
// Note: This is wrong, but we cannot easily get the IDisposable here
null }
member this.Combine (xs, ys) = Observable.Concat(xs, ys)
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
Для правильной реализации вам нужно написать собственный метод Concat
, который довольно сложен. Идея была бы такова:
- Concat возвращает некоторый специальный тип, например
IConcatenatedObservable
- Когда метод вызывается рекурсивно, вы создаете цепочку
IConcatenatedObservable
, которые ссылаются друг на друга
- Метод
Concat
будет искать эту цепочку и когда, например, три объекта, он опустит средний (чтобы длина цепочки не превышала 2).
Это слишком сложно для ответа StackOverflow, но это может быть полезным отзывом для команды Rx.