Я хочу разобрать IObservable<Memory<bytes>>
в сообщениях.Каждая эмиссия может содержать ноль или более сообщений или объединяться с предыдущими эмиссиями для формирования ноль или более сообщений.Поэтому я использую SelectMany
(псевдокод):
IObservable<Message> messages = bytes.SelectMany(mem => {
// copy into another buffer
if(somethingWrong)
{
return Observable.Throw<Message>(new Exception("..."));
}
else if(gotCompleteMessage)
{
// QUESTION IS PRIMARILY ABOUT THIS
// create message(s)
// clear/compact buffer
// return new Observable of message(s)
}
else
{
return Observable.Empty<Message>();
}
});
Мне интересно, как это работает в отношении выделения кучи и безопасности потоков.Если я правильно понимаю, если бы на messages
была подписка без ObserveOn
, это синхронно вызвало бы OnNext
на наблюдателе, и все было бы в стеке.И если использовать ObserveOn
, то сами сообщения, но не Observable
, возвращаемые SelectMany
, будут жить в куче и должны быть поточно-ориентированными.Правильно?