Меня интересует, как flatMap управляет своими «дочерними» потоками, например, следующий код работает нормально:
private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
.onBackpressureLatest()
.parallel()
.runOn(Schedulers.computation())
.flatMap((Function<Long, Publisher<PlcDataPackage>>) aLong -> mDataPackageFlowable)
.sequential();
}
И этот код останавливается после того, как был вызван 128 раз (это maxConcurent по умолчанию для flowable):
private ConnectableFlowable<PlcDataPackage> createConnectablePlcFlowable() {
return mPlcIntervalFlowable.onBackpressureLatest()
.subscribeOn(Schedulers.single())
.publish();
}
Подписаться:
addDisposable(mGetPlcUpdatesChanelUseCase.execute()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(plcDto -> Timber.d("plcReceiver"),
Timber::e));
UseCase:
public class GetPlcUpdatesChanelUseCase extends UseCase<PlcDto, Object> {
private final PlcRepository mPlcRepository;
public GetPlcUpdatesChanelUseCase(PlcRepository plcRepository) {
mPlcRepository = plcRepository;
}
@Override
public Flowable<PlcDto> buildFlowable(Optional<Object> optional) {
return mPlcRepository.getUpdatesChannel();
}
@Override
public boolean isParamsRequired() {
return false;
}
}
Методы репо
@Override
public Flowable<PlcDto> getUpdatesChannel() {
return mPlcCore.getPlcConnectableFlowable()
.map(mPlcInfoTopPlcDtoTransformer::transform);
}
Метод PlcCore
public ConnectableFlowable<PlcDataPackage> getPlcConnectableFlowable() {
return mConnectableFlowable;
}
И mConnectableFlowable это:
mConnectableFlowable = createConnectablePlcFlowable();
mConnectableFlowable.connect();
Итак, как я понимаю, mDataPackageFlowable создается один раз, затем он выполняется, и каждый раз, когда он создает новый «поток» для своего дочернего элемента, а после 128 выполняет только блокивсе последующие казни.
Итак, есть 3 основных вопроса:
1) Контролирует ли flatMap дочерние потоки?
2) Почему он выполняет каждый новый «запрос» в новом потоке? (Может быть, нет, скажите мне тогда)
3) В каких случаях мы можем потерять контроль над дочерними потоками.
ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: английский - мой второй язык, если что-то не ясно, спросите меня, и япопробуйте добавить пояснения.
private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
.onBackpressureLatest()
.parallel()
.runOn(Schedulers.computation())
.sequental()
Эта комбинация не работает, она фактически удаляет 128-кратный предел вызовов flatMap, но не очищает старую внутреннюю подписку, которая приводит к утечке памяти и исключениям OOM.Вместо этого используйте какую-нибудь карту.