доступ к IObservable внутри той же подписки IObservable - PullRequest
1 голос
/ 11 октября 2011

Вот простой пример того, что я пытаюсь сделать с Reactive Extensions, но он не работает

Добавить не работает в этом простом примере

    public static void Main(string[] args)
    {
        var list = new List<int> { 1, 2, 3 };
        var obs = list.ToObservable();
        IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Console.WriteLine(Add(obs).ToString());
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
        Console.ReadLine();
        subscription.Dispose();
    }

    private static int Add(IObservable<int> wholeList)
    {
        int sum = 0;
        wholeList.ForEach(i => sum = sum + i);
        return sum;
    }

Фактический вывод

1
_

Требуемый вывод

1
6
2
6
3
6
Sequence Completed
_

т.е. я бы хотел выполнить метод Add (obs) внутри каждой итерации, где obs сам является холодной IObservable, проходящейитерация

Ответы [ 2 ]

2 голосов
/ 11 октября 2011

Изменить это:

IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)

к этому:

IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)

Вы должны заметить, что вы делаете плохо, когда речь идет о Rx. Вы входите и выходите из наблюдаемых. Вам следует избегать этого везде, где это возможно.

Так, например, избегайте этого:

    var list = new List<int> { 1, 2, 3 };
    var obs = list.ToObservable();

когда это то же самое:

    var obs = Observable.Range(1, 3);

Также весь метод static int Add(IObservable<int> wholeList) плох. Он вызывает ForEach (что обычно должно быть предупреждением о том, что вы делаете что-то не так), чтобы убрать значения из наблюдаемой. Вот где может произойти тупиковая блокировка.

Уже существует наблюдаемое расширение, называемое Sum, которое возвращает IObservble<int>, и это не выводит вас из наблюдаемого.

Так что попробуйте написать свой код так:

var obs = Observable.Range(1, 3);

var query =
    from n in obs
    from s in obs.Sum()
    select new
    {
        Number = n.ToString(),
        Sum = s.ToString(),
    };

using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
    x =>
        {
            Console.WriteLine(x.Number);
            Console.WriteLine(x.Sum);
        },
    err =>
        Console.WriteLine("Error"),
    () =>
        Console.WriteLine("Sequence Completed")))
{
    Console.ReadLine();
}

Надеюсь, это поможет.

1 голос
/ 12 октября 2011

Что касается вашего комментария, я бы посоветовал вам сделать заметное для создания элементов по мере необходимости, а не делать это после подписки.В вашем примере вы можете сделать что-то вроде:

var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable().Select(i => new Tuple<int,IObservable<int>>(i,list.ToObservable()));

obs.SubscribeOn(Scheduler.NewThread).Subscribe(t => {
  Console.WriteLine(t.Item1);
  SaveItems(t.Item2);
});
...