Как изменить реализацию Rx Builder, чтобы исправить исключение переполнения стека? - PullRequest
11 голосов
/ 28 мая 2011

Я пытаюсь найти Rx Builder для использования Reactive Extension в синтаксисе выражения вычисления F #.Как я могу это исправить, чтобы он не взорвал стек?Как пример Seq ниже.И есть ли планы предоставить реализацию RxBuilder как часть Reactive Extensions или как часть будущих версий .NET Framework?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore

Ответы [ 5 ]

9 голосов
/ 05 сентября 2012

Обратите внимание, что это было исправлено в Rx v2.0 (как уже упоминалось здесь), в более общем плане для всех операторов секвенирования (Concat, Catch, OnErrorResumeNext), а также для императивных операторов (If, While и т. Д.) .

По сути, вы можете думать об этом классе операторов, как о подписке на другую последовательность в сообщении наблюдателя терминала (например, Concat подписывается на следующую последовательность после получения сообщения OnCompleted текущего), где и происходит аналогия хвостовой рекурсии. в.

В Rx v2.0 все хвостовые рекурсивные подписки сведены в структуру, похожую на очередь, для обработки по одной за раз, общаясь с нижестоящим наблюдателем. Это позволяет избежать неограниченного роста числа наблюдателей, разговаривающих друг с другом для последовательных подписок на последовательности.

9 голосов
/ 28 мая 2011

Короткий ответ: Rx Framework не поддерживает генерацию наблюдаемых объектов с использованием рекурсивного шаблона, подобного этому, поэтому это нелегко сделать. Операция Combine, используемая для последовательностей F #, нуждается в особой обработке, которую не обеспечивают наблюдаемые. Rx Framework, вероятно, ожидает, что вы сгенерируете наблюдаемые с помощью Observable.Generate, а затем будете использовать компилятор запросов LINQ / F # для их обработки.

Во всяком случае, вот некоторые мысли -

Прежде всего, вам необходимо заменить Observable.merge на Observable.Concat. Первый запускает обе наблюдаемые параллельно, а второй сначала выдает все значения из первой наблюдаемой, а затем выдает значения из второй наблюдаемой. После этого изменения фрагмент будет печатать как минимум ~ 800 чисел до переполнения стека.

Причина переполнения стека состоит в том, что Concat создает наблюдаемую область, которая вызывает Concat, чтобы создать другую наблюдаемую область, которая вызывает Concat и т. Д. Одним из способов решения этой проблемы является добавление некоторой синхронизации. Если вы используете Windows Forms, вы можете изменить Delay так, чтобы он планировал наблюдаемое в потоке GUI (который отбрасывает текущий стек). Вот эскиз:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

Для правильной реализации вам нужно написать собственный метод Concat, который довольно сложен. Идея была бы такова:

  • Concat возвращает некоторый специальный тип, например IConcatenatedObservable
  • Когда метод вызывается рекурсивно, вы создаете цепочку IConcatenatedObservable, которые ссылаются друг на друга
  • Метод Concat будет искать эту цепочку и когда, например, три объекта, он опустит средний (чтобы длина цепочки не превышала 2).

Это слишком сложно для ответа StackOverflow, но это может быть полезным отзывом для команды Rx.

4 голосов
/ 07 июня 2011

Это было исправлено в Rx 2.0 Beta . А вот и тест .

3 голосов
/ 01 июня 2011

А что-то вроде этого?

type rxBuilder() =    
   member this.Delay (f : unit -> 'a IObservable) = 
               { new IObservable<_> with
                    member this.Subscribe obv = (f()).Subscribe obv }
   member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
               { new IObservable<_> with
                    member this.Subscribe obv = xs.Subscribe obv ; 
                                                ys.Subscribe obv }
   member this.Yield x = Observable.Return x
   member this.YieldFrom xs = xs

let rx = rxBuilder()

let rec f x = rx { yield x 
                   yield! f (x + 1) }

do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore

do System.Console.ReadLine() |> ignore

http://rxbuilder.codeplex.com/ (созданный для экспериментов с RxBuilder)

Одноразовые xs не подключены.Как только я пытаюсь подключить одноразовое устройство, оно снова начинает взрываться.

2 голосов
/ 29 мая 2011

Если мы удалим синтаксический сахар из этого вычислительного выражения (он же Monad), мы получим:

let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )

Или в C #:

public static IObservable<int> g(int x)
{
    return Observable.Defer<int>(() =>
    {
      return Observable.Merge(Observable.Return(x), g(x + 1));                    
    });
}

Что определенно не является хвостовой рекурсией. Я думаю, что если вы сможете сделать его хвостовым рекурсивным, то это, вероятно, решит вашу проблему

...