Как создать блокирующее противодавление с помощью rxjava Flowables? - PullRequest
0 голосов
/ 12 сентября 2018

У меня есть Flowable, который мы возвращаем в функцию, которая будет постоянно читать из базы данных и добавлять ее в Flowable.

public void scan() {
    Flowable<String> flow = Flowable.create((FlowableOnSubscribe<String>) emitter -> {
        Result result = new Result();
        while (!result.hasData()) {
            result = request.query(skip, limit);
            partialResult.getResult()
                .getFeatures().forEach(feature -> emmitter.emit(feature));
        }
    }, BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.io());
    return flow;
}

Тогда у меня есть другой объект, который может вызвать этот метод.

myObj.scan()
        .parallel()
        .runOn(Schedulers.computation())
        .map(feature -> {
            //Heavy Computation
        })
        .sequential()
        .blockingSubscribe(msg -> {
            logger.debug("Successfully processed " + msg);
        }, (e) -> {
            logger.error("Failed to process features because of error with scan", e);
        });

Мой тяжелый вычислительный раздел может занять очень много времени. Настолько долго, что существует большая вероятность того, что запросы к базе данных загрузят всю базу данных в память до того, как потребитель завершит первую пару записей.

Я прочитал о противодавлении с помощью rxjava, но только 4 варианта заставляют меня сбрасывать данные или заменять их последними.

Есть ли способ сделать так, чтобы при вызове emmitter.emit (функция) блоки вызова вызывались до тех пор, пока в Flowable не останется больше места?

I.E Я хочу рассматривать Flowable как блокирующую очередь, в которой push будет находиться в спящем режиме, если очередь превышает емкость.

...