Итак, я пытаюсь использовать реактивные элементы для перекомпоновки фрагментированных сообщений, идентифицированных по ID, и у меня возникла проблема с завершением финальной наблюдаемой. У меня есть класс Message, который состоит из Id, Total Size, Payload, Chunk Number and Type и имеет следующий код на стороне клиента:
Мне нужно рассчитать количество сообщений, которые нужно принять во время выполнения
(from messages in
(from messageArgs in Receive select Serializer.Deserialize<Message>(new MemoryStream(Encoding.UTF8.GetBytes(messageArgs.Message))))
group messages by messages.Id into grouped select grouped)
.Subscribe(g =>
{
var cache = new List<Message>();
g.TakeWhile((int) Math.Ceiling(MaxPayload/g.First().Size) < cache.Count)
.Subscribe(cache.Add,
_ => { /* Rebuild Message Parts From Cache */ });
});
Сначала я создаю сгруппированные наблюдаемые сообщения фильтрации по их уникальному идентификатору, а затем я пытаюсь кэшировать все сообщения в каждой группе, пока я не соберу их все, затем я сортирую их и собираю их вместе. Кажется, что выше блокирует g.First ().
Мне нужен способ, чтобы рассчитать число, полученное из первого (или любого) из сообщений, которые поступают, однако я испытываю трудности при этом. Любая помощь?