Изменить это:
IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)
к этому:
IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)
Вы должны заметить, что вы делаете плохо, когда речь идет о Rx. Вы входите и выходите из наблюдаемых. Вам следует избегать этого везде, где это возможно.
Так, например, избегайте этого:
var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable();
когда это то же самое:
var obs = Observable.Range(1, 3);
Также весь метод static int Add(IObservable<int> wholeList)
плох. Он вызывает ForEach
(что обычно должно быть предупреждением о том, что вы делаете что-то не так), чтобы убрать значения из наблюдаемой. Вот где может произойти тупиковая блокировка.
Уже существует наблюдаемое расширение, называемое Sum
, которое возвращает IObservble<int>
, и это не выводит вас из наблюдаемого.
Так что попробуйте написать свой код так:
var obs = Observable.Range(1, 3);
var query =
from n in obs
from s in obs.Sum()
select new
{
Number = n.ToString(),
Sum = s.ToString(),
};
using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
x =>
{
Console.WriteLine(x.Number);
Console.WriteLine(x.Sum);
},
err =>
Console.WriteLine("Error"),
() =>
Console.WriteLine("Sequence Completed")))
{
Console.ReadLine();
}
Надеюсь, это поможет.