У меня есть один источник элементов данных, и я хочу поделиться этим потоком с несколькими нисходящими потоками.
Это очень похоже на пример в справочном руководстве , но я чувствуюэтот пример обманывает, вызывая .connect()
вручную.В частности, я не знаю, сколько будет нисходящих подписчиков, и у меня нет контроля, чтобы звонить .connect()
«в конце».Потребители должны быть в состоянии подписаться, но не должны немедленно инициировать получение данных.И затем, когда-нибудь в будущем, когда данные действительно понадобятся, они будут извлекать их по мере необходимости.
Кроме того, источник чувствителен к потреблению, поэтому его нельзя получить повторно.
Чтобы добавить к этому, онбудет очень большим, так что буферизация и повторное воспроизведение - не вариант.
В идеале, помимо всего этого, все происходит в одном потоке, поэтому нет параллелизма или ожидания.
(Предоставление оченьмалое время ожидания присоединения подписчиков нежелательно)
Мне удалось достичь почти желаемого эффекта для Monos (значения одного конечного результата):
public class CoConsumptionTest {
@Test
public void convenientCoConsumption() {
// List used just for the example:
List<Tuple2<String, String>> source = Arrays.asList(
Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
);
// Source which is sensitive to consumption
AtomicInteger consumedCount = new AtomicInteger(0);
Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();
@Override
public boolean hasNext() {
return sourceIterator.hasNext();
}
@Override
public Tuple2<String, String> next() {
Tuple2<String, String> e = sourceIterator.next();
consumedCount.incrementAndGet();
System.out.println("Audit: " + e);
return e;
}
};
// Logic in the service:
Flux<Tuple2<String, String>> f = Flux.fromIterable(() -> statefulIterator);
ConnectableFlux<Tuple2<String, String>> co = f.publish();
BiFunction<String, String, Mono<Tuple2<String, String>>> findOne = (t1, t2) ->
co.filter(e -> e.getT1().equals(t1) && e.getT2().equals(t2))
.next() //gives us a Mono
.toProcessor() //makes it eagerly subscribe and demand from the upstream, so it wont miss emissions
.doOnSubscribe(s -> co.connect()); //when an actual user consumer subscribes
// Subscribing (outside the service)
assumeThat(consumedCount).hasValue(0);
Mono<Tuple2<String, String>> a2 = findOne.apply("a", "2");
Mono<Tuple2<String, String>> b1 = findOne.apply("b", "1");
Mono<Tuple2<String, String>> c1 = findOne.apply("c", "1");
assertThat(consumedCount).hasValue(0);
// Data is needed
SoftAssertions softly = new SoftAssertions();
assertThat(a2.block()).isEqualTo(Tuples.of("a", "2"));
softly.assertThat(consumedCount).hasValue(4); //fails
assertThat(b1.block()).isEqualTo(Tuples.of("b", "1"));
softly.assertThat(consumedCount).hasValue(4); //fails
assertThat(c1.block()).isEqualTo(Tuples.of("c", "1"));
softly.assertThat(consumedCount).hasValue(4); //fails
softly.assertAll();
}
}
Q1 : Я хочу знать, как контролировать спрос, вместо того, чтобы охотно просить все.Эта текущая реализация запрашивает неограниченное количество и заставляет весь источник быть истощенным за один раз.См. Ошибочные (мягкие) утверждения.
Q2 : Также я хочу знать, как этого добиться для результатов Flux, т.е. для нескольких значений после применения фильтрации, а не только для первого/следующий.(Требуется только столько, сколько необходимо)
(Попытка наивно заменить .toProcessor()
на .publish().autoConnect(0)
, но безуспешно)
Я подозреваю, что мои два вопроса связаны, так как ответ на один поможетс другой.