Я столкнулся с чем-то странным при использовании System.Reactive
.Может быть, это обычное поведение, но оно не имеет для меня большого смысла.
Давайте возьмем следующий код:
Subject<IObservable<long>> X = new Subject<IObservable<long>>();
IObservable<long> I = Observable.Interval(TimeSpan.FromSeconds(1));
async Task Main()
{
X.Switch().Subscribe(x => Console.WriteLine($"switched_1: {x}"));
I.Subscribe(x => Console.WriteLine($"direct_1: {x}"));
X.Switch().Subscribe(x => Console.WriteLine($"switched_2: {x}"));
I.Subscribe(x => Console.WriteLine($"direct_2: {x}"));
await Task.Factory.StartNew(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
X.Switch().Subscribe(x => Console.WriteLine($"switched_3 !!!: {x}"));
I.Subscribe(x => Console.WriteLine($"direct_3: {x}"));
});
X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.ReadLine();
X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.WriteLine("New observable emited");
Console.ReadLine();
}
Наблюдаемое, помеченное знаком !!!никогда не срабатывает, пока не будет выпущен второй интервал.
[Обновить]
Я думаю, что я знаю, что происходит: я подписываюсьв восходящий поток наблюдается каждый раз с новым переключателем.И пока я это делаю, я получу уведомление только о наблюдаемой, излучаемой после подписки, и я не могу «подключиться» к текущей наблюдаемой.Я думал, что с помощью переключить только один раз, и подписаться на получившуюся наблюдаемую позже поможет:
Subject<IObservable<long>> X = new Subject<IObservable<long>>();
IObservable<long> XI;
void Main()
{
XI = X.Switch().AsObservable();
XI.Subscribe(x => Console.WriteLine($"switched_1: {x}"));
XI.Subscribe(x => Console.WriteLine($"switched_2: {x}"));
X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
XI.Subscribe(x => Console.WriteLine($"switched_3 !!!: {x}"));
Console.ReadLine();
X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.WriteLine("New observable emited");
Console.ReadLine();
}
Но не стал: (
[Обновление 2]
Кажется, я нашел решение, которое действительно работает, но я не уверен, правильно ли оно используется или нет.
Subject<IObservable<long>> X = new Subject<IObservable<long>>();
IObservable<long> XI;
async Task Main()
{
XI = X.Switch().Publish().AutoConnect();
...
Как я могу заставить его работать изсамое начало?