Как настроить поток, полностью управляемый обратным давлением, в Java Reactor? - PullRequest
0 голосов
/ 10 января 2019

У меня есть ситуация, когда мне нужно несколько работников (скажем, 2). Рабочие должны выполнить задачу, потребляющую восходящие события.

Задание выполняет список событий и имеет постоянное время, не зависящее от размера списка.

Поэтому я хочу, чтобы апстрим доставил список со всеми буферизованными событиями только при запросе, 1 список за раз.

К сожалению, большинство методов осуществляют предварительную выборку. Что происходит, что даже с помощью limitRate(1, 0) в восходящем направлении получает слишком много onRequest(1), просто для пополнения нижестоящего буфера.

Поэтому я изо всех сил пытаюсь создать буферный список только тогда, когда рабочий доступен: они обычно создаются заранее, не достигая моей цели максимизировать размер буферизованного списка.

Как мне реализовать такую ​​настройку?

Есть ли способ вообще отключить предварительную выборку?

1 Ответ

0 голосов
/ 10 января 2019

Не уверен, что правильно понял вопрос. Пример кода, который показывает, что вы делаете в настоящее время, поможет.

Один из способов не извлекать данные из источника, пока onRequest - это defer создание экземпляра Flux. Таким образом, ваш код будет выглядеть примерно так:

Flux source = Flux.defer(() -> getFluxForUpstreamSource());

Используется другой способ потребления из источника с использованием противодавления Flux.generate. Ваш код будет выглядеть примерно так:

Flux source = Flux.generate(
        UpstreamSource::getConnection,
        (connection, sink) -> {
            try {
                sink.next(connection.getNext());
            } catch (UpstreamException e) {
                sink.error(e);
            }
            return connection;
        }
);
...