RxJava - Сбой обработки запроса с IndexOutOfBoundsException - PullRequest
0 голосов
/ 28 мая 2018

Я использую RxJava и сталкиваюсь со следующей проблемой.

    threw exception [Request processing failed; nested exception is java.lang.IndexOutOfBoundsException: Index: 0, Size: 0] with root cause
rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: [Ljava.lang.Object;.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:190) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:257) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:323) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext(OperatorOnErrorResumeNextViaFunction.java:154) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onNext(OperatorTimeoutBase.java:131) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:48) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:33) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.Observable.unsafeSubscribe(Observable.java:10151) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call(CachedThreadScheduler.java:228) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.2.0.jar!/:1.2.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_111]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_111]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_111]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]

Ниже приведен фрагмент того, что я пытаюсь сделать с RxJava

    Observable<A> AObservable = Observable.fromCallable(() ->
            //External Service Call
    ).timeout(800, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .onErrorReturn(throwable -> {
                LOGGER.warn(format("Server did not respond within %s ms for id=%s", 800, id));
                return null;
            });

    Observable<B> BObservable = Observable.fromCallable(() ->
            //External Service Call
    ).timeout(800, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .onErrorReturn( throwable -> {
                LOGGER.warn(format("Service did not respond within %s ms for id=%s", 800, Id));
                return null;
            });

    // Build Default response
    Observable<C> CObservable = Observable.fromCallable(() ->
            // Build Default one
    ).subscribeOn(Schedulers.io());


    return Observable.zip(AObservable, BObservable,CObservable,
            (AResponse, BResponse, CResponse) -> {

        // Handle response and combine them

    }).toBlocking().first();

IЯ тестирую это локально, и он работает хорошо, но когда я развернул его на aws, я столкнулся с вышеупомянутой проблемой.Также обратите внимание, что я не сталкиваюсь с проблемой для всех идентификаторов, но только для нескольких идентификаторов.Я довольно новичок в RxJava, может кто-то укажет, есть ли потенциальная проблема с асинхронным кодом.

1 Ответ

0 голосов
/ 28 мая 2018

Вы можете использовать оператор switchIfEmpty для возврата значения по умолчанию, когда истекает время ожидания любого из запросов (таким образом возвращая пустую наблюдаемую).

Observable<String> AObservable = Observable.fromCallable(() -> {
    Thread.sleep(100);
    return "response A";
}).timeout(800, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .onErrorResumeNext((value) -> {
            return Observable.empty();
        });

Observable<String> BObservable = Observable.fromCallable(() -> {
    Thread.sleep(1500);
    return "response B";
}).timeout(800, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .onErrorResumeNext((value) -> {
            return Observable.empty();
        });

Observable<String> CObservable = Observable.fromCallable(() -> "default response")
        .subscribeOn(Schedulers.io());

String result = Observable.zip(AObservable, BObservable,
        (AResponse, BResponse) -> AResponse + " and " + BResponse)
        .switchIfEmpty(CObservable)
        .singleElement()
        .blockingGet();

System.out.println(result);

Вы можете изменить Thread.sleep аргументы, чтобы получить разные результаты.

...