Как создать «надежного» абонента? - PullRequest
0 голосов
/ 06 апреля 2019

Я новичок в реактивных Службах Отдыха, и, экспериментируя с apache.cfx.cxf-rt-rs-extension-реактивных потоков, я не смог сохранить поток.

Я думал, что смогу получить его, используя противодавление и блокировку при наблюдении, но после получения первых результатов методы заканчиваются и запрос на подписку игнорируется.

Что я делаю не так?

  private void asyncGet() {
    System.out.println("asyncGet");

    Client c = null;
    try {
        c = ClientBuilder.newBuilder()
                .executorService(Executors.newSingleThreadExecutor(Executors.defaultThreadFactory()))
                .register(FlowableRxInvokerProvider.class)
                .build();

        Publisher<String> publisher = c.target("http://localhost:9081/newRequest/rxget")
                .request()
                .rx(FlowableRxInvoker.class).get(new GenericType<String>() {
                });

        Subscriber<? super String> subscriber = new FlowableSubscriber<String>() {

            Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                subscription.request(10);
            }

            @Override
            public void onNext(String request) {
                System.out.println("elaborating " + request);
                int random = (int) Math.random() * 10 + 1;

                //sleepFor(random);

                System.out.println("onNext");

                subscription.request(10);

                System.out.println("requested 10");
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
        ((Flowable<String>) publisher).observeOn(Schedulers.computation(), true).blockingSubscribe(subscriber);


        System.out.println("end!!!");



    } finally {
        if (c != null) {
            c.close();
        }
    }
}

И что я получаю после вызова метода 5 раз:

asyncGet
asyncGet
asyncGet
asyncGet
asyncGet
init!
Number of request = 50000
elaborating Request_4
onNext
requested 10
onComplete
end!!!
elaborating Request_3
onNext
requested 10
elaborating Request_0
onNext
requested 10
elaborating Request_2
elaborating Request_1
onNext
onComplete
end!!!
onComplete
end!!!
requested 10
onNext
requested 10
onComplete
end!!!
onComplete
end!!!

Заранее благодарим за любой совет!

Bye Fabio

...