Реактор проекта: ConnectableFlux автоматическое подключение по требованию - PullRequest
5 голосов
/ 16 июня 2019

У меня есть один источник элементов данных, и я хочу поделиться этим потоком с несколькими нисходящими потоками.

Это очень похоже на пример в справочном руководстве , но я чувствуюэтот пример обманывает, вызывая .connect() вручную.В частности, я не знаю, сколько будет нисходящих подписчиков, и у меня нет контроля, чтобы звонить .connect() «в конце».Потребители должны быть в состоянии подписаться, но не должны немедленно инициировать получение данных.И затем, когда-нибудь в будущем, когда данные действительно понадобятся, они будут извлекать их по мере необходимости.

Кроме того, источник чувствителен к потреблению, поэтому его нельзя получить повторно.
Чтобы добавить к этому, онбудет очень большим, так что буферизация и повторное воспроизведение - не вариант.

В идеале, помимо всего этого, все происходит в одном потоке, поэтому нет параллелизма или ожидания.
(Предоставление оченьмалое время ожидания присоединения подписчиков нежелательно)

Мне удалось достичь почти желаемого эффекта для Monos (значения одного конечного результата):

public class CoConsumptionTest {
    @Test
    public void convenientCoConsumption() {
        // List used just for the example:
        List<Tuple2<String, String>> source = Arrays.asList(
                Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
                Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
                Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
        );

        // Source which is sensitive to consumption
        AtomicInteger consumedCount = new AtomicInteger(0);
        Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
            private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();

            @Override
            public boolean hasNext() {
                return sourceIterator.hasNext();
            }

            @Override
            public Tuple2<String, String> next() {
                Tuple2<String, String> e = sourceIterator.next();
                consumedCount.incrementAndGet();
                System.out.println("Audit: " + e);
                return e;
            }
        };

        // Logic in the service:
        Flux<Tuple2<String, String>> f = Flux.fromIterable(() -> statefulIterator);
        ConnectableFlux<Tuple2<String, String>> co = f.publish();

        BiFunction<String, String, Mono<Tuple2<String, String>>> findOne = (t1, t2) ->
                co.filter(e -> e.getT1().equals(t1) && e.getT2().equals(t2))
                        .next() //gives us a Mono
                        .toProcessor() //makes it eagerly subscribe and demand from the upstream, so it wont miss emissions
                        .doOnSubscribe(s -> co.connect()); //when an actual user consumer subscribes

        // Subscribing (outside the service)
        assumeThat(consumedCount).hasValue(0);
        Mono<Tuple2<String, String>> a2 = findOne.apply("a", "2");
        Mono<Tuple2<String, String>> b1 = findOne.apply("b", "1");
        Mono<Tuple2<String, String>> c1 = findOne.apply("c", "1");
        assertThat(consumedCount).hasValue(0);

        // Data is needed
        SoftAssertions softly = new SoftAssertions();

        assertThat(a2.block()).isEqualTo(Tuples.of("a", "2"));
        softly.assertThat(consumedCount).hasValue(4); //fails

        assertThat(b1.block()).isEqualTo(Tuples.of("b", "1"));
        softly.assertThat(consumedCount).hasValue(4); //fails

        assertThat(c1.block()).isEqualTo(Tuples.of("c", "1"));
        softly.assertThat(consumedCount).hasValue(4); //fails

        softly.assertAll();
    }
}

Q1 : Я хочу знать, как контролировать спрос, вместо того, чтобы охотно просить все.Эта текущая реализация запрашивает неограниченное количество и заставляет весь источник быть истощенным за один раз.См. Ошибочные (мягкие) утверждения.

Q2 : Также я хочу знать, как этого добиться для результатов Flux, т.е. для нескольких значений после применения фильтрации, а не только для первого/следующий.(Требуется только столько, сколько необходимо)
(Попытка наивно заменить .toProcessor() на .publish().autoConnect(0), но безуспешно)

Я подозреваю, что мои два вопроса связаны, так как ответ на один поможетс другой.

...