OverflowStrategy не учитывая? - PullRequest
0 голосов
/ 11 января 2020

Я создал Flux, который непрерывно генерирует новое значение интергера. У меня подписчик работает в своем собственном потоке (.publishOn(single())). Независимо от того, какую стратегию я использую (ПОСЛЕДНЮЮ или другую), я всегда получаю один и тот же результат:

*** Received 1 with thread single-1
>>> Generated 1 with thread main
>>> Generated 2 with thread main
>>> Generated 3 with thread main
>>> Generated 4 with thread main
>>> Generated 5 with thread main
>>> Generated 6 with thread main
>>> Generated 7 with thread main
>>> Generated 8 with thread main
>>> Generated 9 with thread main
*** Received 2 with thread single-1
*** Received 3 with thread single-1
*** Received 4 with thread single-1
*** Received 5 with thread single-1
*** Received 6 with thread single-1
*** Received 7 with thread single-1

Из того, что я понял, установив latest, я должен получить только последние целые числа ... некоторые целые числа должен был быть сброшен?

@Test
    @DisplayName("test")
    public void workingFlux() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Flux<Integer> IntGenerator = Flux.create(e -> {
            AtomicInteger iteration = new AtomicInteger(1);
            while (iteration.intValue() < 10) {
                int value = iteration.getAndIncrement();
                e.next(value);
                if (value < 10) {
                    System.out.println(">>> Generated " + value + " with thread " + Thread.currentThread().getName());
                }
            }
        }, FluxSink.OverflowStrategy.DROP);

        IntGenerator.publishOn(single())
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;

                    @Override
                    public void onSubscribe(final Subscription subscription) {
                        s = subscription;
                        s.request(1);
                    }

                    @Override
                    public void onNext(final Integer integer) {
                        System.out.println("*** Received " + integer + " with thread " + Thread.currentThread().getName());
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        s.request(1);
                    }

                    @Override
                    public void onError(final Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
Flux<Integer> IntGenerator = Flux.create(e -> {
            AtomicInteger iteration = new AtomicInteger(1);
            while (iteration.intValue() < 10) {
                int value = iteration.getAndIncrement();
                e.next(value);
                if (value < 10) {
                    System.out.println(">>> Generated " + value + " with thread " + Thread.currentThread().getName());
                }
            }
        }, FluxSink.OverflowStrategy.LATEST);

        IntGenerator.publishOn(single())
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;

                    @Override
                    public void onSubscribe(final Subscription subscription) {
                        System.out.println("Subscribed");
                        s = subscription;
                        s.request(1);
                    }

                    @Override
                    public void onNext(final Integer integer) {
                        System.out.println("*** Received " + integer + " with thread " + Thread.currentThread().getName());
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        s.request(1);
                    }

                    @Override
                    public void onError(final Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

        latch.await(120L, TimeUnit.SECONDS);
        latch.await(120L, TimeUnit.SECONDS);
    }

1 Ответ

0 голосов
/ 14 января 2020

publishOn имеет внутренний буфер емкостью 32 элемента по умолчанию, поэтому вы не генерируете достаточно элементов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...