Подписаться / прослушать «побочные события» потока с помощью System.Reactive - PullRequest
0 голосов
/ 06 июня 2018

Я начинаю с реактивных расширений и у меня возникла проблема, когда я не уверен, что я на правильном пути.

Я использую Observable для создания и использования слушателя.для брокера событий с .NET.Я создал класс «IncomingMessage», который содержит сообщения от событийного брокера по мере их поступления, и я начинаю создавать прослушиватель в функции Observerable.Create.Это работает очень хорошо.

Теперь я также хочу получить уведомление о состоянии от слушателя, как в «Подключение ...», «Подключено», «Закрытие ...», которые не являются IncomingMessage, поэтому я создалкласс "BrokerEvent" со свойством "Message" и интерфейсом для "IncomingMessage" и "BrokerEvent".Теперь я отправляю оба через обозреватель. OnNext (...), как они происходят.Это также хорошо работает до сих пор.

Однако на стороне подписчика у меня сейчас есть небольшая проблема с фильтрацией нужных мне событий.

Я делаю:

GetObservable().Where(x => x is BrokerEvent ||
                        (x is IncomingMessage msg &&
                            msg.User == "test")).Subscribe(...)

Это работает, однако мне нужно снова выяснить тип в Subscribe, который кажется немного уродливым.

Попробовав немного, я закончил делать это сейчас ...

var observable = GetObservable().Publish();

observable.OfType<BrokerEvent>().Subscribe(...);
observable.OfType<IncomingMessage>().Where(x=>x.User == "test").Subscribe(...);

var disposable = observable.Connect();

Это также, кажется, работает, но, поскольку я новичок в реактивных расширениях, я не совсем уверен, есть ли у этого нежелательные побочные эффекты.Я также не уверен, является ли это «правильным» способом включать сообщения о статусе в поток вообще.Есть ли лучший способ справиться с этим (возможно, без использования Publish), или это путь?

И чтобы прекратить прослушивание, достаточно просто утилизировать одноразовое устройство, полученное от .Connect (), илинужно также утилизировать оба расходных материала, которые я получаю .Subscribe ()?

Спасибо!

Ответы [ 2 ]

0 голосов
/ 07 июня 2018

Что вы можете сделать, это создать класс «обработчик событий» с тремя перегрузками «сообщения процесса».Один для объекта (по умолчанию), который ничего не делает, один для типа статуса, один для входящего сообщения.В .Subscribe используйте этот синтаксис

m=>processor.Process((dynamic)m)

Это вызовет правильную реализацию или ничего не сделает, как требуется.

Если вы хотите фильтровать перед вызовом процесса, вы можете ввести общий класс(ProcessableMesage или что-то подобное), или вы можете вызвать .Merge для ваших потоков OfType, или вы можете использовать тот же подход, что и выше, имея динамический класс MessageFilter.

0 голосов
/ 06 июня 2018

Я предполагаю, GetObservable возвращает IObservable<object>, что не идеально.

Лучший способ выполнить код, который вы указали выше, выглядит следующим образом:

var observable = GetObservable().Publish();

var sub1 = observable.OfType<BrokerEvent>().Subscribe(_ => { });
var sub2 = observable.OfType<IncomingMessage>().Where(x => x.User == "test").Subscribe(_ => { });
var connectSub = observable.Connect();
var disposable = new CompositeDisposable(sub1, sub2, connectSub);

После этого составной одноразовый утилизирует всех дочерних элементов.

Если два потока сообщений не имеют никакого отношения друг к другу, этот подход будет работать.Однако, поскольку у вас в основном есть поток управляющих сообщений и поток сообщений данных, я предполагаю, что сообщения от одного из них могут быть важны при обработке сообщений в другом.В этом случае вы можете рассматривать это как один поток.

В этом случае вы можете захотеть создать тип Discrimination-Union для вашей наблюдаемой, что может упростить обработку.

...