Я создал Flux, который непрерывно генерирует новое значение интергера. У меня подписчик работает в своем собственном потоке (.publishOn(single())
). Независимо от того, какую стратегию я использую (ПОСЛЕДНЮЮ или другую), я всегда получаю один и тот же результат:
*** Received 1 with thread single-1
>>> Generated 1 with thread main
>>> Generated 2 with thread main
>>> Generated 3 with thread main
>>> Generated 4 with thread main
>>> Generated 5 with thread main
>>> Generated 6 with thread main
>>> Generated 7 with thread main
>>> Generated 8 with thread main
>>> Generated 9 with thread main
*** Received 2 with thread single-1
*** Received 3 with thread single-1
*** Received 4 with thread single-1
*** Received 5 with thread single-1
*** Received 6 with thread single-1
*** Received 7 with thread single-1
Из того, что я понял, установив latest
, я должен получить только последние целые числа ... некоторые целые числа должен был быть сброшен?
@Test
@DisplayName("test")
public void workingFlux() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flux<Integer> IntGenerator = Flux.create(e -> {
AtomicInteger iteration = new AtomicInteger(1);
while (iteration.intValue() < 10) {
int value = iteration.getAndIncrement();
e.next(value);
if (value < 10) {
System.out.println(">>> Generated " + value + " with thread " + Thread.currentThread().getName());
}
}
}, FluxSink.OverflowStrategy.DROP);
IntGenerator.publishOn(single())
.subscribe(new Subscriber<Integer>() {
private Subscription s;
@Override
public void onSubscribe(final Subscription subscription) {
s = subscription;
s.request(1);
}
@Override
public void onNext(final Integer integer) {
System.out.println("*** Received " + integer + " with thread " + Thread.currentThread().getName());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
s.request(1);
}
@Override
public void onError(final Throwable throwable) {
}
@Override
public void onComplete() {
}
});
Flux<Integer> IntGenerator = Flux.create(e -> {
AtomicInteger iteration = new AtomicInteger(1);
while (iteration.intValue() < 10) {
int value = iteration.getAndIncrement();
e.next(value);
if (value < 10) {
System.out.println(">>> Generated " + value + " with thread " + Thread.currentThread().getName());
}
}
}, FluxSink.OverflowStrategy.LATEST);
IntGenerator.publishOn(single())
.subscribe(new Subscriber<Integer>() {
private Subscription s;
@Override
public void onSubscribe(final Subscription subscription) {
System.out.println("Subscribed");
s = subscription;
s.request(1);
}
@Override
public void onNext(final Integer integer) {
System.out.println("*** Received " + integer + " with thread " + Thread.currentThread().getName());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
s.request(1);
}
@Override
public void onError(final Throwable throwable) {
}
@Override
public void onComplete() {
}
});
latch.await(120L, TimeUnit.SECONDS);
latch.await(120L, TimeUnit.SECONDS);
}