Правильная подписка на текущую наблюдаемую при использовании переключателя на наблюдаемый поток - PullRequest
0 голосов
/ 30 ноября 2018

Я столкнулся с чем-то странным при использовании System.Reactive.Может быть, это обычное поведение, но оно не имеет для меня большого смысла.

Давайте возьмем следующий код:

Subject<IObservable<long>> X = new Subject<IObservable<long>>();

IObservable<long> I = Observable.Interval(TimeSpan.FromSeconds(1));

async Task Main()
{

    X.Switch().Subscribe(x => Console.WriteLine($"switched_1: {x}"));
    I.Subscribe(x => Console.WriteLine($"direct_1: {x}"));
    X.Switch().Subscribe(x => Console.WriteLine($"switched_2: {x}"));
    I.Subscribe(x => Console.WriteLine($"direct_2: {x}"));

    await Task.Factory.StartNew(async () =>
    {
        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
        X.Switch().Subscribe(x => Console.WriteLine($"switched_3 !!!: {x}"));
        I.Subscribe(x => Console.WriteLine($"direct_3: {x}"));
    });

    X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
    Console.ReadLine();
    X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
    Console.WriteLine("New observable emited");
    Console.ReadLine();
}

Наблюдаемое, помеченное знаком !!!никогда не срабатывает, пока не будет выпущен второй интервал.

enter image description here

[Обновить]

Я думаю, что я знаю, что происходит: я подписываюсьв восходящий поток наблюдается каждый раз с новым переключателем.И пока я это делаю, я получу уведомление только о наблюдаемой, излучаемой после подписки, и я не могу «подключиться» к текущей наблюдаемой.Я думал, что с помощью переключить только один раз, и подписаться на получившуюся наблюдаемую позже поможет:

Subject<IObservable<long>> X = new Subject<IObservable<long>>();

IObservable<long> XI;

void Main()
{
    XI = X.Switch().AsObservable();

    XI.Subscribe(x => Console.WriteLine($"switched_1: {x}"));
    XI.Subscribe(x => Console.WriteLine($"switched_2: {x}"));

    X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
    XI.Subscribe(x => Console.WriteLine($"switched_3 !!!: {x}"));
    Console.ReadLine();
    X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
    Console.WriteLine("New observable emited");
    Console.ReadLine();
}

Но не стал: (

[Обновление 2]

Кажется, я нашел решение, которое действительно работает, но я не уверен, правильно ли оно используется или нет.

Subject<IObservable<long>> X = new Subject<IObservable<long>>();

IObservable<long> XI;

async Task Main()
{
    XI = X.Switch().Publish().AutoConnect();
...

Как я могу заставить его работать изсамое начало?

1 Ответ

0 голосов
/ 03 декабря 2018

Ваше объяснение в разделе [Обновление] является правильным, потому что Task.Factory.StartNew возвращает задачу вместо задачи.Вы должны использовать двойной await или Task.Run , если вы хотите, чтобы подписка происходила до того, как вы вызовете OnNext ().

Однако только с отображением XI-Observableкак в [Update2] и скрыть тот факт, что есть что-то, что переключается под это жизнеспособный вариант.

С помощью Publish (). AutoConnect () вы преобразуете наблюдаемое из холодного в горячее.Вы также можете попробовать BehaviorSubject или ReplaySubject в своем фрагменте, чтобы понять разницу между горячей и холодной.Если вы понимаете эти различия, вам должно быть гораздо яснее, как должно выглядеть действительное решение, которое вы имеете в виду.

...