Я начинаю с реактивных расширений и у меня возникла проблема, когда я не уверен, что я на правильном пути.
Я использую Observable для создания и использования слушателя.для брокера событий с .NET.Я создал класс «IncomingMessage», который содержит сообщения от событийного брокера по мере их поступления, и я начинаю создавать прослушиватель в функции Observerable.Create.Это работает очень хорошо.
Теперь я также хочу получить уведомление о состоянии от слушателя, как в «Подключение ...», «Подключено», «Закрытие ...», которые не являются IncomingMessage, поэтому я создалкласс "BrokerEvent" со свойством "Message" и интерфейсом для "IncomingMessage" и "BrokerEvent".Теперь я отправляю оба через обозреватель. OnNext (...), как они происходят.Это также хорошо работает до сих пор.
Однако на стороне подписчика у меня сейчас есть небольшая проблема с фильтрацией нужных мне событий.
Я делаю:
GetObservable().Where(x => x is BrokerEvent ||
(x is IncomingMessage msg &&
msg.User == "test")).Subscribe(...)
Это работает, однако мне нужно снова выяснить тип в Subscribe, который кажется немного уродливым.
Попробовав немного, я закончил делать это сейчас ...
var observable = GetObservable().Publish();
observable.OfType<BrokerEvent>().Subscribe(...);
observable.OfType<IncomingMessage>().Where(x=>x.User == "test").Subscribe(...);
var disposable = observable.Connect();
Это также, кажется, работает, но, поскольку я новичок в реактивных расширениях, я не совсем уверен, есть ли у этого нежелательные побочные эффекты.Я также не уверен, является ли это «правильным» способом включать сообщения о статусе в поток вообще.Есть ли лучший способ справиться с этим (возможно, без использования Publish), или это путь?
И чтобы прекратить прослушивание, достаточно просто утилизировать одноразовое устройство, полученное от .Connect (), илинужно также утилизировать оба расходных материала, которые я получаю .Subscribe ()?
Спасибо!