Поведение, которое вы видите, именно то, что вы можете ожидать.
Чтобы кратко изложить, что делает ваше приложение, позвольте мне объяснить это словами:
У вас есть Observable
для генерации чисел и создания побочного эффекта для каждого элемента.
Затем вы группируете элементы по _ % 3
.
Далее вы делаете еще несколько побочных эффектов (спите и пишете в консоль) внутри каждой группы Observable
.
Затем вы flatMap
каждой группы Observable
, в результате чего получается один плоский Observable
.
Так почему же вы вначале видите только первую группу (где _ % 3 == 0
), печатающую материал для консоли? ***
Ответ лежит в flatMap
: при просмотре документации для Observable
вы найдете следующее описание для flatMap
:
final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]
Alias for concatMap.
[...]
Думайте о Observable
с, как если бы вы думали о List
с в секунду: когда вы объединяете List
с, вы получите один List
, содержащий сначала элементы первого List
далее следуют элементы второго List
и т. д.
В Monix такое же поведение достигается для Observable
, ожидая, пока первая Observable
, полученная внутри операции flatMap
(читай: concatMap
), отправит сигнал «выполнено». Только тогда будет потреблен второй Observable
и т. Д.
Или, проще говоря, flatMap
заботится о последовательности произведенных Observable
с.
Но когда Observable
s в вашей flatMap
операции завершается? Для этого мы должны понять, как работает groupBy
, потому что именно там они и появились.
Чтобы groupBy
работал, хотя Observable
s лениво оценивается, он должен хранить входящие элементы в буфере. Я не уверен на 100% в этом, но если groupBy
работает так, как я думаю, то для любой сгруппированной Observable
, которая тянет следующий элемент, будет проходить исходный Observable
бесконечно, пока не найдет принадлежащий элемент в эту группу, сохраняя все предыдущие (но пока не обязательные) элементы, принадлежащие другим группам, в этом буфере для последующего использования.
Все это означает, что groupBy
не может знать, были ли найдены все элементы группы до тех пор, пока источник Observable
не сообщит о завершении, тогда он будет использовать все оставшиеся буферизованные элементы и затем сообщит о завершении группированным Observable
s. .
Проще говоря: Observable
s, генерируемые groupBy
, не завершаются, пока не завершится источник Observable
.
Когда вы соберете всю эту информацию вместе, вы поймете, что только после того, как источник Observable (ваш Observable.range(0, 130)
) был завершен, первая группа Observable
также будет завершена, и из-за flatMap
только тогда все остальные будут сгруппированы Observable
s.
Поскольку из вашего последнего вопроса я знаю, что вы пытаетесь построить веб-сокет, использование flatMap
является плохой идеей - ваш источник Observable
входящих запросов не будет никогда завершен, эффективно только обслуживая самый первый IP-адрес, с которым вы столкнетесь.
Вместо этого вы должны использовать mergeMap
. При сравнении с concatMap
mergeMap
не заботится о последовательности элементов, вместо этого применяется правило «первым пришел - первым обслужен». .
***: Когда вы дойдете до конца моего объяснения и, надеюсь, поймете, как работают groupBy
и flatMap
, вы поймете, почему я написал «в начале»!