У меня есть Flowable, который мы возвращаем в функцию, которая будет постоянно читать из базы данных и добавлять ее в Flowable.
public void scan() {
Flowable<String> flow = Flowable.create((FlowableOnSubscribe<String>) emitter -> {
Result result = new Result();
while (!result.hasData()) {
result = request.query(skip, limit);
partialResult.getResult()
.getFeatures().forEach(feature -> emmitter.emit(feature));
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io());
return flow;
}
Тогда у меня есть другой объект, который может вызвать этот метод.
myObj.scan()
.parallel()
.runOn(Schedulers.computation())
.map(feature -> {
//Heavy Computation
})
.sequential()
.blockingSubscribe(msg -> {
logger.debug("Successfully processed " + msg);
}, (e) -> {
logger.error("Failed to process features because of error with scan", e);
});
Мой тяжелый вычислительный раздел может занять очень много времени. Настолько долго, что существует большая вероятность того, что запросы к базе данных загрузят всю базу данных в память до того, как потребитель завершит первую пару записей.
Я прочитал о противодавлении с помощью rxjava, но только 4 варианта заставляют меня сбрасывать данные или заменять их последними.
Есть ли способ сделать так, чтобы при вызове emmitter.emit (функция) блоки вызова вызывались до тех пор, пока в Flowable не останется больше места?
I.E Я хочу рассматривать Flowable как блокирующую очередь, в которой push будет находиться в спящем режиме, если очередь превышает емкость.