Как генерировать наблюдаемую из начальной наблюдаемой, когда я могу генерировать только другие наблюдаемые? - PullRequest
4 голосов
/ 08 сентября 2011

Я хочу создать наблюдаемую область, в которой каждое значение наблюдаемой зависит от предыдущей, начиная с одного значения. Если у меня есть простое преобразование между значениями, такими как Func<int, int>, это легко сделать с Observable.Generate, например, так:

Func<int, IObservable<int>> mkInts = init =>
    Observable.Generate(
        init,         // start value
        _ => true,    // continue ?
        i => i + 1,   // transformation function
        i => i);      // result selector

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

Это будет радостно писать цифры на моем экране, пока я не нажму клавишу ввода Тем не менее, моя функция преобразования выполняет некоторые операции сетевого ввода-вывода, поэтому тип равен Func<int, IObservable<int>>, поэтому я не могу использовать этот подход. Вместо этого я попробовал это:

// simulate my transformation function
Func<int, IObservable<int>> mkInt = ts =>
    Observable.Return(ts)
              .Delay(TimeSpan.FromMilliseconds(10));

// pre-assign my generator function, since the function calls itself recursively
Func<int, IObservable<int>> mkInts = null;

// my generator function
mkInts = init =>
{
    var ints = mkInt(init); 

    // here is where I depend on the previous value.
    var nextInts = ints.SelectMany(i => mkInts(i + 1)); 
    return ints.Concat(nextInts);
};

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

Но это будет переполнение стека после печати около 5000 номеров. Как я могу решить это?

Ответы [ 5 ]

3 голосов
/ 17 октября 2011

Я считаю, что следующий ответ правильный, но слишком сложныйЕдинственное изменение, которое я предлагаю, это метод mkInts:

Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) =>
   {
      var s = new Subject<int>();
      s.ObserveOn(Scheduler.NewThread).Select(fn).Subscribe(s);
      s.OnNext(i0);
      return s;
   };
3 голосов
/ 09 сентября 2011

Я думаю, у меня есть хорошее чистое решение для вас.

Сначала вернитесь к использованию Func<int, int> - его легко превратить в Func<int, IObservable<int>> с помощью Observable.FromAsyncPattern.

Я использовал это для тестирования:

Func<int, int> mkInt = ts =>
{
    Thread.Sleep(100);
    return ts + 1;
};

Теперь вот деньги:

Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) =>
    Observable.Create<int>(o =>
    {
        var ofn = Observable
            .FromAsyncPattern<int, int>(
                fn.BeginInvoke,
                fn.EndInvoke);

        var s = new Subject<int>();

        var q = s.Select(x => ofn(x)).Switch();

        var r = new CompositeDisposable(new IDisposable[]
        {
            q.Subscribe(s),
            s.Subscribe(o),
        });

        s.OnNext(i0);

        return r;
    });

Итеративная функция превращается в асинхронную наблюдаемую.

Переменная q подает значения от субъекта в наблюдаемую итерационную функцию и выбирает вычисленную наблюдаемую. Метод Switch выравнивает результат и обеспечивает правильную очистку каждого вызова наблюдаемой итерационной функции.

Кроме того, использование CompositeDisposable позволяет использовать две подписки как одну. Очень аккуратно!

Его легко использовать так:

using (mkInts(7, mkInt).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

Теперь у вас есть полностью параметризованная версия функции генератора. Хорошо, а?

1 голос
/ 09 сентября 2011

Я считаю, что код не является хвостовой рекурсией и, следовательно, вызывает исключение SO. Ниже приведен код, который работает без каких-либо исключений.

public static IObservable<int> GetObs(int i)
{
   return Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10));
}
public static IObservable<int> MakeInts(int start)
{
   return Observable.Generate(start, _ => true, i => i + 1, i => GetObs(i))
                .SelectMany(obs => obs);
}


using (MakeInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

Или изменив свой код, например:

Action<int, IObserver<int>> mkInt = (i,obs) =>
               Observable.Return(i)
              .Delay(TimeSpan.FromMilliseconds(10)).Subscribe<int>(ii => obs.OnNext(ii));

            // pre-assign my generator function, since the function calls itself recursively
            Func<int, IObservable<int>> mkInts = null;
            // my generator function
            mkInts = init =>
            {
                var s = new Subject<int>();
                var ret = s.Do(i => {
                    mkInt(i + 1, s);
                });
                mkInt(init, s);
                return ret;
            };

            using (mkInts(1).Subscribe(Console.WriteLine))
            {
                Console.ReadLine();
            }
1 голос
/ 09 сентября 2011

Я не был полностью уверен, хотите ли вы снова передать конечный результат функции обратно в функцию или если вы хотите иметь отдельную функцию, которая получит следующий вход, поэтому я сделал и то, и другое.Хитрость здесь в том, чтобы позволить IScheduler выполнять тяжелые операции по повторным вызовам.

public Func<T, IObservable<T>> Feedback<T>(Func<T, IObservable<T>> generator, 
                                           IScheduler scheduler)
{
    return seed =>
             Observable.Create((IObserver<T> observer) =>
                 scheduler.Schedule(seed,
                     (current, self) =>
                         generator(current).Subscribe(value => 
                            {
                                observer.OnNext(value);
                                self(value);
                            })));
}

public Func<T, IObservable<T>> GenerateAsync<T>(Func<T, IObservable<T>> generator,
                                                Func<T, T> seedTransform,
                                                IScheduler scheduler)
{
    return seed =>
             Observable.Create((IObserver<T> observer) =>
                 scheduler.Schedule(seed,
                     (current, self) =>
                         generator(current).Subscribe(value =>
                         {
                             observer.OnNext(value);
                             self(seedTransform(current));
                         })));
}
0 голосов
/ 08 сентября 2011

Я нашел решение, которое, хотя и может быть не самым красивым, делает то, что я хочу. Если у кого-то есть лучшее решение, я отмечу это как ответ.

Func<int, IObservable<int>> mkInt = ts =>
    Observable.Return(ts)
              .Delay(TimeSpan.FromMilliseconds(10));

Func<int, IObservable<int>> mkInts = init =>
{
    Subject<int> subject = new Subject<int>();
    IDisposable sub = null;
    Action<int> onNext = null;
    onNext = i =>
    {
        subject.OnNext(i);
        sub.Dispose();
        sub = mkInt(i + 1).Subscribe(onNext);
    };
    sub = mkInt(init).Subscribe(onNext);
    return subject;
};

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}
...