У меня есть сценарий использования, в котором мне нужно создать холодного издателя из слушателя. Я использовал методы publish и autoConnect для создания ConnectableFlux и запуска издателя на первом подписчике. Я также использовал publishOn для запуска потребителей в разных потоках. Я подписываюсь дважды с разными потребителями, потребляя данные с разной скоростью.
Проблема, как только я подписываюсь на второе время, которое является очень медленным потребителем, оно замедляет первого потребителя к его собственному ритму.
package test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class ProblemSample {
public static void main(String[] args) {
EventProcessor myEventProcessor = new EventProcessor();
Flux<String> flux = Flux.<String>create(sink -> {
MyEventListener<String> myEventListener = new MyEventListener<String>() {
public void onDataChunk(String chunk) {
sink.next(chunk);
System.out.println("onnext request " + Thread.currentThread().getName());
}
public void processComplete() {
sink.complete();
}
};
myEventProcessor.register(myEventListener);
}).publish().autoConnect().publishOn(Schedulers.newParallel("reactor"), 1);
flux.subscribe((v) -> {
System.out.println("1111111 having received " + Thread.currentThread().getName() + " value : " + v);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------------subscribe second main---------------------");
flux.subscribe((v) -> {
System.out.println("22222222 having received " + Thread.currentThread().getName() + " value : " + v);
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("main thread ended");
}
static class EventProcessor {
public void register(MyEventListener<String> myEventListener) {
for (int i = 0; i < 10; i++) {
myEventListener.onDataChunk("chunk " + i);
}
myEventListener.processComplete();
}
}
static interface MyEventListener<T> {
public void onDataChunk(String chunk);
public void processComplete();
}
}
Я ожидаю, что оба потребителя будут использовать данные каждый со своим собственным ритмом, не влияя друг на друга.