Проблема с потоком при выполнении асинхронного запроса внутри ConnectableObservable - PullRequest
0 голосов
/ 21 января 2019

Итак, у меня есть ConnectableObservable, который получает строки, для каждой из строк мне нужно сделать запрос (асинхронный) и подождать, пока результат не вернется

В настоящее время я использую защелку, чтобы дождаться возвращения запроса, но наблюдаемая ситуация застревает или просто завершает первую строку, а не продолжает другие.

Я выделил проблему в этом примере

private Subscriber<? super String> stringStreamInput;

   ConnectableObservable<String> stringStream = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            stringStreamInput = subscriber;
        }
    })
            .observeOn(Schedulers.io())
            .map(new Func1<String, String>() {
                @Override
                public String call(String perro) {
                    return getPerroDetails(perro);
                }
            })
            .publish();
    stringStream.connect();

Метод, который вызывает симуляцию запроса

  private String getPerroDetails(final String perro) {
        Log.d("Hey", "Perro " + perro);

        //Simulating the query to get Perro details
        String newPerro = executeQueryForPerro(perro);

        Log.d("Hey", "NewPerro  " + newPerro);
        return newPerro;
    }

Моделирование запроса с использованием CountDownLatch

private String executeQueryForPerro(final String perro) {
        final String[] newPerro = new String[1];
        final CountDownLatch latch = new CountDownLatch(1);

        //SIMULATION OF THE QUERY
        Handler handler = new Handler();
        handler.postDelayed(new Runnable() {
            @Override
            public void run() {
                newPerro[0] = perro + " reloaded ";
            latch.countdown();
            }
        }, 1000);

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return newPerro[0];
    }

Test

 String[] perros = new String[]{"Beagle", "Alaska", "Chihuahua", "PitBull", "RedBull"};
    for (String perro : perros) {
        stringStreamInput.onNext(perro);
    }

Какие другие подходы я могу использовать, чтобы убедиться, что запрос выполнен правильно и подождать, пока результат вернется, чтобы перейти к следующему шагу?

Есть ли какое-нибудь волшебное решение Rx для этого?

Большое спасибо

1 Ответ

0 голосов
/ 22 января 2019

Вы часто сталкиваетесь с проблемами, когда смешиваете мир RxJava с явным управлением потоками, обычно когда дело доходит до тестирования.Инструментарий RxJava достаточно гибок, чтобы предоставить практически все, что вам нужно.

Источник ваших данных должен быть чем-то вроде Subject.В вашем коде вы отправляете значения непосредственно подписчику, полностью обходя цепочку наблюдателя, что, вероятно, не то, что вы хотели.

PublishSubject<String> stringStreamInput = PublishSubject.create();

Затем измените getPerroDetails(), чтобы получить Observable, который будетв итоге вы получите результат:

Observable<String> getPerroDetails(final String perro);

Затем вы можете использовать оператор flatMap() для получения подробной информации:

Observable<String> stringStream = stringStreamInput
  .observeOn(Schedulers.io())
  .flatMap( input -> getPerroDetails(input) )
  .publish();

stringStream.connect();

Как реализован getPerroDetails()?Вы используете один из операторов RxJava для вызова асинхронного метода:

Observable<String> getPerroDetails(String perro)
{
  return Observable
           .just( perro + " reloaded" )
           .delay(1000, TimeUnit.MILLISECONDS));
}
...