Я новичок в реактивных Службах Отдыха, и, экспериментируя с apache.cfx.cxf-rt-rs-extension-реактивных потоков, я не смог сохранить поток.
Я думал, что смогу получить его, используя противодавление и блокировку при наблюдении, но после получения первых результатов методы заканчиваются и запрос на подписку игнорируется.
Что я делаю не так?
private void asyncGet() {
System.out.println("asyncGet");
Client c = null;
try {
c = ClientBuilder.newBuilder()
.executorService(Executors.newSingleThreadExecutor(Executors.defaultThreadFactory()))
.register(FlowableRxInvokerProvider.class)
.build();
Publisher<String> publisher = c.target("http://localhost:9081/newRequest/rxget")
.request()
.rx(FlowableRxInvoker.class).get(new GenericType<String>() {
});
Subscriber<? super String> subscriber = new FlowableSubscriber<String>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(10);
}
@Override
public void onNext(String request) {
System.out.println("elaborating " + request);
int random = (int) Math.random() * 10 + 1;
//sleepFor(random);
System.out.println("onNext");
subscription.request(10);
System.out.println("requested 10");
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
((Flowable<String>) publisher).observeOn(Schedulers.computation(), true).blockingSubscribe(subscriber);
System.out.println("end!!!");
} finally {
if (c != null) {
c.close();
}
}
}
И что я получаю после вызова метода 5 раз:
asyncGet
asyncGet
asyncGet
asyncGet
asyncGet
init!
Number of request = 50000
elaborating Request_4
onNext
requested 10
onComplete
end!!!
elaborating Request_3
onNext
requested 10
elaborating Request_0
onNext
requested 10
elaborating Request_2
elaborating Request_1
onNext
onComplete
end!!!
onComplete
end!!!
requested 10
onNext
requested 10
onComplete
end!!!
onComplete
end!!!
Заранее благодарим за любой совет!
Bye
Fabio