Предотвратить переполнение flux.bufferTimeout после тайм-аута - PullRequest
0 голосов
/ 11 января 2019

Я относительно новичок в реактивном программировании и Reactor. У меня есть ситуация, в которой я хочу bufferTimeout значения в моем потоке, в то время как , держа его под моим контролем (без неограниченного запроса), поэтому я могу вручную запрашивать пакеты значений.

Следующий пример иллюстрирует это:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

Flux<Object> flux = Flux.generate(sink -> {
    try {
        sink.next(queue.poll(10, TimeUnit.DAYS));
    }
    catch (InterruptedException e) {}
});

BaseSubscriber<List<Object>> subscriber = new BaseSubscriber<List<Object>>() {
    protected void hookOnSubscribe(Subscription subscription) {
        // Don't request unbounded
    }

    protected void hookOnNext(List<Object> value) {
        System.out.println(value);
    }
};

flux.subscribeOn(parallel())
        .log()
        .bufferTimeout(10, ofMillis(200))
        .subscribe(subscriber);

subscriber.request(1);

// Offer a partial batch of values
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);

// Wait for timeout, expect [1, 2, 3, 4, 5] to be printed
Thread.sleep(500); 

// Offer more values
queue.offer(6);
queue.offer(7);
queue.offer(8);
queue.offer(9);
queue.offer(10);
Thread.sleep(1000);

Это вывод:

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[ INFO] (main) request(10)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-1) onNext(4)
[ INFO] (parallel-1) onNext(5)
[1, 2, 3, 4, 5]
[ INFO] (parallel-1) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-1) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-1) onNext(10)
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests

Я действительно ожидал этого, потому что я понимаю, что подписчик буфера запросит 10 значений в восходящем направлении, который не знает о тайм-ауте и выдаст их все независимо. Поскольку один-единственный запрос был выполнен после истечения времени ожидания, предложенные позже значения все еще создаются и переполняются.

Интересно, возможно ли предотвратить создание оставшихся значений после истечения времени ожидания или сохранить их без потери контроля? Я пытался:

  • limitRate(1) до bufferTimeout, пытаясь сделать значения запроса буфера «по требованию». Он запрашивает один за другим, но 10 раз, потому что буфер запрашивает 10 значений.
  • onBackpressureBuffer(10), так как проблема в основном в определении противодавления, если я правильно понял. Попытка буферизовать переполненные значения из запроса на тайм-аут, но это запрашивает неограниченные значения, которых я бы хотел избежать.

Похоже, мне придется реализовать еще одну реализацию bufferTimeout, но мне сказали, что писать издателям сложно. Я что-то пропустил? Или я неправильно реагирую?

1 Ответ

0 голосов
/ 18 января 2019

Решил, внедрив собственного подписчика:

https://gist.github.com/hossomi/5edf60acb534a16c025e12e4e803d014

Он запрашивает только необходимое количество значений и буферизует полученные, пока нет активного запроса. Буфер не ограничен, поэтому может потребоваться использовать его с осторожностью или изменить его.

Скорее всего, не такой надежный, как стандартный подписчик Reactor, но у меня работает. Предложения приветствуются!

...