Я хочу использовать Reactive Extensions для преобразования некоторых сообщений и ретрансляции их после небольшой задержки.
Сообщения выглядят примерно так:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
Вывод выглядит примерно так:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
Существует несколько требований:
- Длина задержки зависит от содержимого сообщения.
- Каждое сообщение имеет GroupId
- Если поступает более новое сообщение с тем же GroupId, что и у отложенного сообщения, ожидающего передачи, то первое сообщение должно быть отброшено и только второе передано после новой задержкиperiod.
Учитывая Observable и функцию отправки:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
Я понимаю, что могу использовать Select для выполнения преобразования.
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- Как применить сообщение указать задержку?(Обратите внимание, что это может / должно привести к неправильной доставке сообщений.)
- Как я могу де-дуплицировать сообщения с одинаковым идентификатором группы?
- Способен ли Rx решить эту проблему?
- Есть ли другой способ решения этой проблемы?