OOM при использовании воспроизведения (selectorFoo), но не публикации (selectorFoo) - PullRequest
0 голосов
/ 15 января 2019

Это происходит с OOM:

Flowable.range(1, 5000)
        .map(__ -> new byte[1024 * 1024])
        .replay(
          fb ->
            fb.take(1)
              .concatMap(__ -> fb)
          ,1
        )
        .count()
        .toFlowable()
        .blockingSubscribe(c -> System.out.println("args = [" + c + "]"));

Это, я думаю, потому что replay удерживает выбросы из восходящего потока, хотя я бы подумал, что подсказка размера буфера 1 сделает его не… что я пропустил?

Это не сбой:

Flowable.range(1, 5000)
        .map(__ -> new byte[1024 * 1024])
        .publish(
          fb ->
            fb.take(1)
              .concatMap(first -> fb.startWith(first))
          ,1
        )
        .count()
        .toFlowable()
        .blockingSubscribe(c -> System.out.println("args = [" + c + "]"));

Но я не уверен, что мне гарантировано, что я получу ВСЕ выбросы из верхнего потока, как это ...

1 Ответ

0 голосов
/ 16 января 2019

Я исследовал это и нашел причину проблемы: ошибка в replay в RxJava 2.

В результате replay содержит ссылки на 2 подписчика, один для take, а другой для внутреннего потребителя concatMap в локальной переменной, таким образом, существует корень GC от основного потока до defunct take по-прежнему ссылается на самый первый элемент. Поскольку ограниченное воспроизведение использует связанный список, этот самый первый элемент затем продолжает ссылаться на все новые и новые элементы через свои «следующие» ссылки и в итоге исчерпывает память.

publish не сохраняет ссылки на старые значения, поэтому это не проблема.

...