Buffer
- лучший способ сделать это:
var source = new Subject<string>();
var result = source.Publish(_source => _source
.Buffer(_source.Where(s => s.EndsWith(".")))
)
.Select(l => l.Aggregate ((x, y) => x + y));
result.Subscribe(s => Console.WriteLine(s));
source.OnNext("This is ");
source.OnNext("only part of");
source.OnNext(" the message.");
source.OnNext("Not. A. Full. Message ");
source.OnNext("but end of stream anyway");
source.OnCompleted();
Buffer
принимает параметр, который указывает, где должно произойти разделение групп, которое мы указываем с помощью предложения where.Buffer
объединяет сообщения в список, который мы затем агрегируем с помощью Агрегата Linq.
EDIT :
Publish
позволяет избежать повторной подписки.Если вы удалите Publish
, решение будет выглядеть следующим образом и будет работать:
var result2 = source.Buffer(
source.Where(s => s.EndsWith("."))
)
.Select(l => l.Aggregate((x, y) => x + y));
Однако, result2
будет подписан дважды на source
, что может быть источником ошибокособенно если source
не очень хорошо реализован или ведет себя как наблюдаемый.Поэтому, когда вы дважды подписываетесь на наблюдаемое, лучше всего использовать Publish
, который, по сути, «перенаправляет» сообщения из одной подписки в несколько подписок.