Как работает flatMap? - PullRequest
       5

Как работает flatMap?

0 голосов
/ 30 ноября 2018

Меня интересует, как flatMap управляет своими «дочерними» потоками, например, следующий код работает нормально:

 private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
    return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
            .onBackpressureLatest()
            .parallel()
            .runOn(Schedulers.computation())
            .flatMap((Function<Long, Publisher<PlcDataPackage>>) aLong -> mDataPackageFlowable)
            .sequential();
}

И этот код останавливается после того, как был вызван 128 раз (это maxConcurent по умолчанию для flowable):

  private ConnectableFlowable<PlcDataPackage> createConnectablePlcFlowable() {
    return mPlcIntervalFlowable.onBackpressureLatest()
            .subscribeOn(Schedulers.single())
            .publish();
}

Подписаться:

addDisposable(mGetPlcUpdatesChanelUseCase.execute()
                              .observeOn(AndroidSchedulers.mainThread())
                              .subscribe(plcDto -> Timber.d("plcReceiver"),
                                         Timber::e));

UseCase:

public class GetPlcUpdatesChanelUseCase extends UseCase<PlcDto, Object> {

    private final PlcRepository mPlcRepository;

    public GetPlcUpdatesChanelUseCase(PlcRepository plcRepository) {
        mPlcRepository = plcRepository;
    }

    @Override
    public Flowable<PlcDto> buildFlowable(Optional<Object> optional) {
        return mPlcRepository.getUpdatesChannel();
    }

    @Override
    public boolean isParamsRequired() {
        return false;
    }
}

Методы репо

@Override
    public Flowable<PlcDto> getUpdatesChannel() {
        return mPlcCore.getPlcConnectableFlowable()
                .map(mPlcInfoTopPlcDtoTransformer::transform);
    }

Метод PlcCore

public ConnectableFlowable<PlcDataPackage> getPlcConnectableFlowable() {
    return mConnectableFlowable;
}

И mConnectableFlowable это:

mConnectableFlowable = createConnectablePlcFlowable();
        mConnectableFlowable.connect();

Итак, как я понимаю, mDataPackageFlowable создается один раз, затем он выполняется, и каждый раз, когда он создает новый «поток» для своего дочернего элемента, а после 128 выполняет только блокивсе последующие казни.

Итак, есть 3 основных вопроса:

1) Контролирует ли flatMap дочерние потоки?

2) Почему он выполняет каждый новый «запрос» в новом потоке? (Может быть, нет, скажите мне тогда)

3) В каких случаях мы можем потерять контроль над дочерними потоками.

ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: английский - мой второй язык, если что-то не ясно, спросите меня, и япопробуйте добавить пояснения.


 private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
    return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
            .onBackpressureLatest()
            .parallel()
            .runOn(Schedulers.computation())
            .sequental()

Эта комбинация не работает, она фактически удаляет 128-кратный предел вызовов flatMap, но не очищает старую внутреннюю подписку, которая приводит к утечке памяти и исключениям OOM.Вместо этого используйте какую-нибудь карту.

1 Ответ

0 голосов
/ 30 ноября 2018

Для правильной работы цепочки наблюдателей необходима подписка.Когда вы используете interval() для генерации данных, вы предоставляете «горячую» наблюдаемую, которая генерирует значения самостоятельно.«Холодная» наблюдаемая будет выдавать значения только тогда, когда происходит подписка.

128 - это количество записей, которые буферизуются с помощью flatMap() до его остановки.Если есть подписка, то flatMap() будет выдавать значения ниже по потоку, которые производит внутренняя наблюдаемая, и она не остановится.

flatMap() сама не работает на конкретном планировщике, в соответствии с javadoc,Это означает, что он не манипулирует своими подписками в определенных потоках.Если вы хотите контролировать работу, выполняемую в наблюдаемой, вызванной flatMap(), то вы используете явное планирование:

observable
  .flatMap( value -> fun(value).subscribeOn( myScheduler ) )
  .subscribe();

myScheduler может, например, быть Schedulers.io(), который создает потоки при необходимости,В качестве альтернативы это может быть Executor, который вы предоставляете с фиксированным числом потоков.Я часто использовал Executor s, для которого было выделено только один, два или 48 потоков для управления разветвлением из flatMap().

Вы также можете указать параметр параллелизма для flatMap(), который сообщаетэто максимальное количество подписок, которое оно будет поддерживать.Когда flatMap() достигает максимума, он буферизует запросы до тех пор, пока не завершатся цепочки наблюдателей, на которые он подписан.

Оператор parallel() делает что-то подобное, но разделяет входящие события, отправляя их в отдельные потоки.,Опять же, у javadoc есть отличные описания вместе с хорошими картинками.

Всегда можно потерять контроль над потоками.Когда вы используете оператор RxJava, прочитайте документацию по нему.Есть две области, которые вы хотите понять.Первая область - это то, над чем работает планировщик оператора.Если в нем говорится, что он не работает с определенным планировщиком, то это напрямую не влияет на выбор потоков или их использование.Если в нем говорится, что он использует определенный планировщик, вам необходимо понять, как работает этот планировщик;всегда будет другая версия оператора, которая позволит вам выбрать планировщик по вашему выбору.

Вторая область, которую вы должны понять, это обратное давление.Вы должны понимать, что означает противодавление и как оно применяется.Это особенно важно всякий раз, когда вы переступаете границу потока, например, с помощью observeOn() или subscribeOn().

...