Я работаю над одновременной операцией ввода-вывода.Код ниже является тестовым кодом для моей архитектуры.Я хочу дать тайм-аут для асинхронных операций, но после некоторых шагов поток зависает.Когда я сбрасываю тайм-аут, функционал кода работает хорошо.
Чего мне не хватает?В чем проблема?
Scheduler mainObserveOn = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler mainSubscribeOn = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler flatmapObserveOn = Schedulers.from(Executors.newFixedThreadPool(2));
Scheduler flatmapSubscribeOn = Schedulers.from(Executors.newFixedThreadPool(2));
AtomicLong a = new AtomicLong();
Flowable.create(emitter -> {
while(true)
{
for (int i = 0; i < 100; i++)
{
emitter.onNext(a.incrementAndGet());
System.out.println("Emit : "+a.get()+" | Thread : "+Thread.currentThread().getName());
}
Thread.sleep(100);
}
}, BackpressureStrategy.BUFFER)
.observeOn(mainObserveOn)
.flatMap(
o -> Flowable.just(o)
.observeOn(flatmapObserveOn)
.doOnNext(o1 -> {
System.out.println(o1+" | Thread : "+Thread.currentThread().getName());
if( (long)o1 % 10 == 0 )
{
System.out.println("SLEEP");
Thread.sleep(10);
}
})
.doOnError(throwable -> throwable.printStackTrace())
.onExceptionResumeNext(s -> {})
.timeout(2, TimeUnit.MILLISECONDS)
.subscribeOn(flatmapSubscribeOn)
)
.subscribeOn(mainSubscribeOn)
//.blockingSubscribe(System.out::println, System.err::println);
.subscribe(System.out::println, System.err::println);
Thread.sleep(1000000000000000000L);
Кстати, я хочу пропустить операции с ошибками, поэтому используйте пустой onExceptionResumeNext.Есть ли лучший способ для этого?