Как передать индексы элементов в Observable.fromIterable в onNext в методе подписки? - PullRequest
0 голосов
/ 11 февраля 2020

Я пытаюсь загрузить данные и поместить их в SparseArray, используя RxJava2. Я получаю данные, вызывая URL-адрес из массива, но мне нужно, чтобы ответ был проанализирован и вставлен в SparseArray с порядком URL-адресов в массиве, поэтому мне нужно передать индекс элемента String в mUrls.getGroups()

Заранее спасибо!

@GET
Single<ResponseBody> getChannels(@Url String url);


groups = new SparseArray<>();


Observable.fromIterable(mUrls.getGroups())
    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        //
        // How can I access the index of the String item in the array?
        //
        .subscribe(new Observer<ResponseBody>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(ResponseBody responseBody) {
                Group group = GroupParser.parseList(responseBody.byteStream(), index);


                groups.put(index, group);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }

            @Override
            public void onComplete() {

            }
 });

РЕДАКТИРОВАТЬ:

Вот реализованное решение:

 Observable.defer(() -> {
                        AtomicInteger counter = new AtomicInteger();
                        return Observable.fromIterable(mUrls.getGroups())
                                .map(url -> new Pair(url, counter.getAndIncrement()));
                    }).flatMapSingle(pair ->
                            aPI.getChannels(pair.first.toString())
                                    .map(responseBody -> new Pair(responseBody, pair.second))
                                    .subscribeOn(Schedulers.io())
                    )
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Pair>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(Pair pair) {
                            Pair<ResponseBody, Integer> resultPair =  (Pair<ResponseBody, Integer>) pair;
                            Group group = GroupParser.parseList(resultPair.first.byteStream(),
                                    resultPair.second);

                            groups.put(resultPair.second, group);
                        }

                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "***** message: " + e.getMessage());
                        }

                        @Override
                        public void onComplete() {
                            Log.i(TAG, "***** onComplete.");
                        }
                    });

1 Ответ

0 голосов
/ 12 февраля 2020

Если вы обрабатываете URL-адреса последовательно, вы можете просто ввести поле index в Observer:

Observable.fromIterable(mUrls.getGroups())
    .concatMapSingle(url -> getChannels(url))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<ResponseBody>() {

         int index;  // <----------------------------------------------------

         // ...

         @Override
         public void onNext(ResponseBody responseBody) {
             Group group = GroupParser.parseList(responseBody.byteStream(), index);


             groups.put(index, group);

             index++;  // <---------------------------------------------------------
         }

         // ...
    });

Однако, если вы обрабатываете URL-адреса одновременно, вам нужно соединить каждый из них URL с индексом и пометить его вместе. Например, для класса Pair:

 Observable.defer(() -> {
     AtomicInteger counter = new AtomicInteger();
     return Observable.fromIterable(mUrls.getGroups())
     .map(url -> Pair.of(url, counter.getAndIncremenet()));
 })
 .flatMapSingle(urlIndex -> 
     getChannels(urlIndex.first)
     .map(v -> Pair.of(v, urlIndex.second))
     .subscribeOn(Schedulers.io())
 )
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Observer<Pair<ResponseBody, Integer>>() {

         // ...

         @Override
         public void onNext(Pair<ResponseBody, Integer> pair) {
             Group group = GroupParser.parseList(pair.first.byteStream(), pair.second);


             groups.put(pair.second, group);
         }

         // ...
    });
...