Задержка и дедупликация с использованием Reactive Extensions (Rx) - PullRequest
9 голосов
/ 19 января 2011

Я хочу использовать Reactive Extensions для преобразования некоторых сообщений и ретрансляции их после небольшой задержки.

Сообщения выглядят примерно так:

class InMsg
{
   int GroupId { get; set; }
   int Delay { get; set; }
   string Content { get; set; }
}

Вывод выглядит примерно так:

class OutMsg
{ 
   int GroupId { get; set; }
   string Content { get; set; }
   OutMsg(InMsg in)
   {
       GroupId = in.GroupId;
       Content = Transform(in.Content);  // function omitted
   }
}

Существует несколько требований:

  • Длина задержки зависит от содержимого сообщения.
  • Каждое сообщение имеет GroupId
  • Если поступает более новое сообщение с тем же GroupId, что и у отложенного сообщения, ожидающего передачи, то первое сообщение должно быть отброшено и только второе передано после новой задержкиperiod.

Учитывая Observable и функцию отправки:

IObservable<InMsg> inMsgs = ...;

void Send(OutMsg o)
{
     ... // publishes transformed messages
}

Я понимаю, что могу использовать Select для выполнения преобразования.

void SetUp()
{
     inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
  • Как применить сообщение указать задержку?(Обратите внимание, что это может / должно привести к неправильной доставке сообщений.)
  • Как я могу де-дуплицировать сообщения с одинаковым идентификатором группы?
  • Способен ли Rx решить эту проблему?
  • Есть ли другой способ решения этой проблемы?

Ответы [ 2 ]

9 голосов
/ 19 января 2011

Вы можете использовать GroupBy, чтобы сделать IGroupedObservable, Delay, чтобы задержать вывод, и Switch, чтобы убедиться, что более новые значения заменяют предыдущие значения в их группе:

IObservable<InMsg> inMessages;

inMessages
    .GroupBy(msg => msg.GroupId)
    .Select(group =>
        {
            return group.Select(groupMsg => 
                {
                    TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
                    OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here

                    return Observable.Return(outMsg).Delay(delay);
                })
                .Switch();
        })
        .Subscribe(outMsg => Console.Write("OutMsg received"));

Aпримечание о реализации: если сгруппированное значение поступило после , сообщение отправлено (т.е. после задержки), начнется новая задержка

2 голосов
/ 02 апреля 2018

@ ответ Ричарда Сзалая почти работает для меня (используется .NET Rx 3.1.1 в .NET Framework 4.6), но мне нужно добавить .Merge() в конец выражения, чтобы объединить IObservable<IObservable<OutMsg>> результатов, например, так:

Для меня (с использованием .NET Rx 3.1.1 в .NET Framework 4.6) исправлено добавление .Merge() в конец, например:

var deduplicated = inputs
    .GroupBy(input => input)
    .Select(group =>
        group
        .Select(input => Observable.Return(input).Delay(TimeSpan.FromSeconds(5)))
        .Switch())
    .Merge(); // <-- This is added to combine the partitioned results
...