RxJava2 Текучий отличить последний элемент - PullRequest
1 голос
/ 19 сентября 2019

Я хочу обработать Flowable партиями и обработать последнюю партию по-другому.Есть ли лучший способ, чем использовать AtomicReference для кэширования предыдущего пакета и обработки последнего пакета в onComplete()?

      AtomicReference<List<Integer>> batchRef = new AtomicReference<>();
      Flowable.just(1, 2, 3, 4, 5, 6)
              .buffer(2)
              .concatMapCompletable(batch -> {
                 List<Integer> previousBatch = batchRef.getAndSet(batch);
                 if (previousBatch != null) {
                    System.out.println("Regular batch: " + previousBatch);
                 }
                 // Something asynchronous would go here
                 return Completable.complete();
              })
              .subscribe(() -> {
                 System.out.println("Last batch: " + batchRef.get());
              });

1 Ответ

0 голосов
/ 23 сентября 2019

То есть вы хотите реагировать на потоковые события определенным образом.Самый простой способ - создать пользовательский класс подписчика, который будет делать именно то, что вам нужно:

static class MySubscriber implements FlowableSubscriber<List<Integer>> {
    List<Integer> previousBatch;

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(List<Integer> batch) {
        if (previousBatch != null) {
            System.out.println("Regular batch: " + previousBatch);
        }
        previousBatch = batch;
    }

    @Override
    public void onError(Throwable throwable) {}

    @Override
    public void onComplete() {
        System.out.println("Last batch: " + previousBatch);
    }
}

public static void main(String[] args) {
    Flowable.just(1, 2, 3, 4, 5, 6)
            .buffer(2)
            .subscribe(new MySubscriber());
}

}

...