Повторите буфер в rxjava - PullRequest
2 голосов
/ 14 июня 2019

Горячий Наблюдаемый испускает предметы.Я хочу загрузить эти элементы на сервер.Есть два соображения:

  1. Из-за затрат на io операций, я хочу пакетировать эти элементы и загружать в виде массива
  2. Из-за ненадежности операций io,Я хочу, чтобы неудачные загрузки партий были добавлены к следующей партии.
Uploads succeed:
1 - 2 - 3 - 4 - 5
------------------
u(1,2,3) - u(4,5)

First upload fails:
1 - 2 - 3 - 4 - 5
------------------
u(1,2,3) - u(1,2,3,4,5)

Я могу позаботиться о первой, используя оператор buffer, но не знаю, как удовлетворить вторую.требование.

1 Ответ

2 голосов
/ 15 июня 2019

Вот моя идея хранения сбоев в очереди

public class StackOverflow {

    public static void main(String[] args) {
        // store any failures that may have occurred
        LinkedBlockingQueue<String> failures = new LinkedBlockingQueue<>();

        toUpload()
                // buffer however you want
                .buffer(5)
                // here is the interesting part
                .flatMap(strings -> {
                    // add any previous failures
                    List<String> prevFailures = new ArrayList<>();
                    failures.drainTo(prevFailures);
                    strings.addAll(prevFailures);

                    return Flowable.just(strings);
                })
                .flatMapCompletable(strings -> {
                    // upload the data
                    return upload(strings).doOnError(throwable -> {
                        // if its an upload failure:
                        failures.addAll(strings);
                    });
                }).subscribe();
    }

    // whatever your source flowable is
    private static Flowable<String> toUpload() {
        return Flowable.fromIterable(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i"));
    }

    // some upload operation
    private static Completable upload(List<String> strings) {
        return Completable.complete();
    }
}

некоторые крайние случаи здесь заключаются в том, что, если последняя текучая буферизованная группа дает сбой, она не будет повторять их. Этого можно достичь с помощью оператора retryWhen, но основная идея заключается в использовании очереди

...