Использование реактивов для объединения фрагментированных сообщений - PullRequest
3 голосов
/ 11 февраля 2011

Итак, я пытаюсь использовать реактивные элементы для перекомпоновки фрагментированных сообщений, идентифицированных по ID, и у меня возникла проблема с завершением финальной наблюдаемой. У меня есть класс Message, который состоит из Id, Total Size, Payload, Chunk Number and Type и имеет следующий код на стороне клиента:

Мне нужно рассчитать количество сообщений, которые нужно принять во время выполнения

(from messages in
   (from messageArgs in Receive select Serializer.Deserialize<Message>(new MemoryStream(Encoding.UTF8.GetBytes(messageArgs.Message))))
 group messages by messages.Id into grouped select grouped)
.Subscribe(g =>
{
    var cache = new List<Message>();
    g.TakeWhile((int) Math.Ceiling(MaxPayload/g.First().Size) < cache.Count)
      .Subscribe(cache.Add, 
    _ => { /* Rebuild Message Parts From Cache */ });
});

Сначала я создаю сгруппированные наблюдаемые сообщения фильтрации по их уникальному идентификатору, а затем я пытаюсь кэшировать все сообщения в каждой группе, пока я не соберу их все, затем я сортирую их и собираю их вместе. Кажется, что выше блокирует g.First ().

Мне нужен способ, чтобы рассчитать число, полученное из первого (или любого) из сообщений, которые поступают, однако я испытываю трудности при этом. Любая помощь?

1 Ответ

3 голосов
/ 11 февраля 2011

First является оператором блокировки (как еще он может вернуть T, а не IObservable<T>?)

Я думаю, что использование Scan (которое строит агрегат с течением времени) может быть тем, что вам нужно. Используя Scan, вы можете скрыть «состояние» перестройки вашего сообщения в объекте «строитель».

MessageBuilder.IsComplete возвращает true, когда весь размер полученных сообщений достигает MaxPayload (или какими бы ни были ваши требования). MessageBuilder.Build() затем возвращает восстановленное сообщение.

Я также переместил ваш код «построения сообщения» в SelectMany, который хранит встроенные сообщения в монаде.

(Извинения за переформатирование кода в методы расширения, мне трудно читать / писать смешанный синтаксис LINQ)

Receive
    .Select(messageArgs => Serializer.Deserialize<Message>(
        new MemoryStream(Encoding.UTF8.GetBytes(messageArgs.Message))))
    .GroupBy(message => message.Id)
    .SelectMany(group =>
    {
        // Use the builder to "add" message parts to
        return group.Scan(new MessageBuilder(), (builder, messagePart) =>
        {
            builder.AddPart(messagePart);

            return builder;
        })
        .SkipWhile(builder => !builder.IsComplete)
        .Select(builder => builder.Build());
    })
    .Subscribe(OnMessageReceived);
...