rx. net объединить поток сброса - PullRequest
1 голос
/ 29 января 2020

У нас есть работающий сервис, который обрабатывает сообщения от системы x к системе y. В основном это выглядит следующим образом:

aSystem.Messages.Subscribe(message => 
{
   try
   {
      ProcessMessage(message);
   }
   catch(Exception ex)
   {
      _logger.LogFatal(ex.Message);
   }
})

Проблема в том, что мы получаем по крайней мере одно сообщение каждую секунду, а наше сообщение LogFatal настроено на отправку электронной почты. В результате почтовый ящик взорвался в определенный момент.

Код «улучшен» путем добавления пользовательского класса Logging, который будет содержать последнюю метку времени. Исходя из этой отметки времени, сообщение будет регистрироваться или нет.

Это выглядит громоздко, и я думаю, что это идеальный сценарий для Rx.NET. Нам нужно следующее:

  • 1) Записать, если строка изменится
  • 2) Записать, если прошло определенное время

Что я пробовал следующее:

var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(10)); // log at least every 10 seconds

var subscription = logMessagesChangedStream.Merge(logMessagesSampleStream).Subscribe(result =>
{
    _logger.LogFatal(result);
});

aSystem.Messages.Subscribe(message => 
{
   try
   {
      ProcessMessage(message);
   }
   catch(Exception ex)
   {
      logSubject.OnNext(ex.Message);
   }
})

Похоже, это работает, но при этом сообщение будет записано дважды, один раз для DistinctUntilChanged и один раз для Sample. Так или иначе, я должен сбросить потоки, если один из них выпустил значение. Они прекрасно работают независимо, после слияния они должны слушать друг друга; -)

1 Ответ

2 голосов
/ 30 января 2020

Есть неоднозначный оператор Amb, который гоняет две последовательности, чтобы увидеть, кто победит первым.

Observable.Amb(logMessagesChangedStream, logMessagesSampleStream) 

Поток победителей продолжает распространяться до конца - мы этого не хотим. Мы заинтересованы в том, чтобы снова начать гонку за следующей ценностью. Давайте сделаем это:

    Observable.Amb(logMessagesChangedStream, logMessagesSampleStream)
      .Take(1)
      .Repeat()

Теперь последняя проблема заключается в том, что DistinctUntilChanged теряет свое состояние каждый раз, когда мы перезапускаем гонку, и его поведение равно pu sh самому первому значению, которое он получает немедленно. Давайте исправим это, превратив его в горячую наблюдаемую.

logSubject.DistinctUntilChanged().Publish();

Собираем все вместе:

var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged().Publish(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(5)); // log at least every 5 seconds    

var oneof =
        Observable
          .Amb(logMessagesChangedStream, logMessagesSampleStream)
          .Take(1)
          .Repeat(); 

   logMessagesChangedStream.Connect();
   oneof.Timestamp().Subscribe(c => Console.WriteLine(c));

Изменил 10 секунд на 5, , потому что .

Тест

    Action<string> onNext = logSubject.OnNext;

    onNext("a");
    onNext("b");
    Delay(1000, () => { onNext("c"); onNext("c"); onNext("c"); onNext("d"); });
    Delay(3000, () => { onNext("d"); onNext("e"); });
    Delay(6000, () => { onNext("e"); });
    Delay(10000, () => { onNext("e"); });

Выход

a@1/30/2020 12:17:52 PM +00:00
b@1/30/2020 12:17:52 PM +00:00
c@1/30/2020 12:17:53 PM +00:00
d@1/30/2020 12:17:53 PM +00:00
e@1/30/2020 12:17:55 PM +00:00
e@1/30/2020 12:18:00 PM +00:00
e@1/30/2020 12:18:05 PM +00:00
...