Перезапуск наблюдаемой подписки при изменении свойства - PullRequest
3 голосов
/ 27 мая 2020

У меня есть простая подписка, созданная в MyClass:

//myService.Connect returns IObservable<MyData>
myService.Connect(requestParameters)
         .Subscribe(DoSomething);

В MyClass есть свойство, которое я слушаю, которое используется для создания requestParameters . Моя цель - снова вызывать myService.Connect всякий раз, когда это свойство изменяется, используя новые параметры requestParameters и отказываясь от подписки на предыдущую подписку.

Есть ли простой способ сделать это с помощью RX? Я смотрел Switch, но он используется при подписке на Observable, который испускает Observables. Я также посмотрел на TakeUntil, и я могу поддерживать подписку до тех пор, пока свойство не изменится, но я не уверен, как я могу вызвать автоматическую c повторную подписку.

В конце дня я могу просто вызывайте myService. Подключайтесь к себе всякий раз, когда свойство изменяется, но я хотел бы увидеть, есть ли какие-то существующие функции RX, которые я могу использовать вместо этого.

1 Ответ

3 голосов
/ 27 мая 2020

В зависимости от того, что именно вы хотите сделать, оператор Switch действительно может быть тем, что вы ищете. Его можно использовать для предоставления одной простой наблюдаемой для publi c, где лежащая в основе наблюдаемая наблюдаемая может быть "переключена" на другую наблюдаемую. Пока тип такой же, он может работать. Фактически, оператор Switch() требует, чтобы наблюдаемые были одного типа. См. Следующее изображение из документации выше:

Marble diagram of the Switch operator Мраморная диаграмма оператора Switch из http://reactivex.io

Верхняя часть показывает наблюдаемые, которые активны и отправляют значения в «видимый» наблюдаемый внизу. Но в какой-то момент исходный источник изменяется или «переключается» (круги на треугольники), в то время как «видимый» наблюдаемый внизу дает значения, как будто ничего не произошло.

В вашем случае вы получаете новый наблюдаемый с помощью myService.Connect(requestParameters). Вы вводите это значение в тему IObservable<YourType> с помощью метода OnNext(). Посмотрите на следующий пример:

ISubject<IObservable<int>> sourceOfObservables = new Subject<IObservable<int>>();

IObservable<int> publicSource = sourceOfObservables.Switch();

IDisposable testSubscription = publicSource.Subscribe(it => {
    Console.WriteLine("Value received: "+it);
});

sourceOfObservables.OnNext(Observable.Range(1, 5));
sourceOfObservables.OnNext(Observable.Range(10, 5));
sourceOfObservables.OnNext(Observable.Range(100, 5));

Это сгенерирует следующий вывод:

Value received: 10
Value received: 11
Value received: 12
Value received: 13
Value received: 14
Value received: 100
Value received: 101
Value received: 102
Value received: 103
Value received: 104

Как видите, у вас есть только одна активная подписка testSubscription, где вы можете прочитать 15 различных ценности. Однако они происходят из трех отдельных источников / наблюдаемых. Это похоже на Concat, где вы можете объединить наблюдаемые объекты, но на этот раз вы как бы «заменяете» одну активную подписку другой, без необходимости повторной подписки на новую наблюдаемую.

Вы можете решить эту проблему самостоятельно. проблема, сначала определив ISubject<IObservable<YourType>>. Затем, когда ваш requestParameters изменяет ваш pu sh новый экземпляр IObservable<YourType> из вашего myService.Connect() метода на ваш ISubject<IObservable<YourType>>, с помощью метода OnNext() (как показано в примере). С этого момента следующие значения берутся из , что наблюдаемое.

Бонус:

Когда requestParameters уже получено из наблюдаемого, вы можете использовать оператор Select() (из System.Reactive.Linq, а не из обычного LINQ) для автоматического построения IObservable<YourType>, который вы можете «сгладить», «псевдосогласовать» или буквально «переключить» с помощью Switch():

IObservable<string> requestParametersChangeStream;

requestParametersChangeStream             // produces "string" objects 
    .Select(it => myService.Connect(it))  // produces "IObservable<YourType>" objects
    .Switch()                             // make it so it looks like
                                          // an IObservable<YourType>
    .Subscribe(it => DoWhateverYouWant(it));
...