Как насчет этого:
IObservable<string> splitXmlTokensIntoSeparateLines(string s)
{
// Here, you need to split tokens into separate lines (where 'token'
// is the beginning of an Xml element). This makes it easier down
// the line for the TakeWhile operator.
return new[] { firstPart, secondPart, etc }.ToObservable();
}
bool doesTokenTerminateDocument(string s)
{
// Here, you should return whether the XML represents the end of one
// document
}
var xmlDocuments = stringObservable
.SelectMany(x => splitXmlTokensIntoSeparateLines(x))
.TakeWhile(x => doesTokenTerminateDocument(x))
.Aggregate(new StringBuilder(), (acc, x), acc.Append(x))
.Select(x => {
var ret = new XDocument();
ret.Parse(x.ToString());
return ret;
})
.Repeat()
.TakeUntil(stringObservable.Aggregate(0, (acc, _) => acc));
TakeUntil - это хак, чтобы заставить его правильно завершиться - в основном, Repeat будет продолжать повторную подписку навсегда, если мы не остановим его, сказав, что он завершится, когда stringObservable завершит работу.