Rx Java2 обрабатывает исключение асинхронного тайм-аута - PullRequest
0 голосов
/ 17 декабря 2018

Я работаю над одновременной операцией ввода-вывода.Код ниже является тестовым кодом для моей архитектуры.Я хочу дать тайм-аут для асинхронных операций, но после некоторых шагов поток зависает.Когда я сбрасываю тайм-аут, функционал кода работает хорошо.

Чего мне не хватает?В чем проблема?

    Scheduler mainObserveOn = Schedulers.from(Executors.newFixedThreadPool(1));
    Scheduler mainSubscribeOn = Schedulers.from(Executors.newFixedThreadPool(1));
    Scheduler flatmapObserveOn = Schedulers.from(Executors.newFixedThreadPool(2));
    Scheduler flatmapSubscribeOn = Schedulers.from(Executors.newFixedThreadPool(2));

    AtomicLong a = new AtomicLong();
    Flowable.create(emitter -> {

        while(true)
        {
            for (int i = 0; i < 100; i++)
            {
                emitter.onNext(a.incrementAndGet());
                System.out.println("Emit : "+a.get()+" | Thread : "+Thread.currentThread().getName());
            }

            Thread.sleep(100);
        }

    }, BackpressureStrategy.BUFFER)
    .observeOn(mainObserveOn)
    .flatMap(
            o -> Flowable.just(o)
                    .observeOn(flatmapObserveOn)
                    .doOnNext(o1 -> {
                        System.out.println(o1+" | Thread : "+Thread.currentThread().getName());

                        if( (long)o1 % 10 == 0 )
                        {
                            System.out.println("SLEEP");
                            Thread.sleep(10);
                        }
                    })
                    .doOnError(throwable -> throwable.printStackTrace())
                    .onExceptionResumeNext(s -> {})
                    .timeout(2, TimeUnit.MILLISECONDS)
                    .subscribeOn(flatmapSubscribeOn)
    )
    .subscribeOn(mainSubscribeOn)
    //.blockingSubscribe(System.out::println, System.err::println);
    .subscribe(System.out::println, System.err::println);


    Thread.sleep(1000000000000000000L);

Кстати, я хочу пропустить операции с ошибками, поэтому используйте пустой onExceptionResumeNext.Есть ли лучший способ для этого?

1 Ответ

0 голосов
/ 17 декабря 2018

Код тайм-аута внутри doOnNext() приведет к тому, что цепь наблюдателя будет спать в течение 10 секунд каждые 10 излучений.Позже у вас есть timeout() на 2 секунды;это сработает на десятой эмиссии, вызывая выброс onError().После этого у вас не будет кода обработки ошибок, поэтому он, скорее всего, приведет к завершению всей цепочки наблюдателей.

В результате будет получено 101 строка журнала для начальных выбросов, за которыми следуют 10 строк журнала для первых 10 выбросов, за которыми следуетпо сообщению об ошибке, вероятно Timeout.

Не ясно, что вы хотите, но это то, что вы получаете.

...