В Vert.x у меня есть тема публикации, на которую я хотел бы подписаться несколькими наблюдателями. Vert.x ожидает CompletionStage в результате. Как отправить значение от наблюдателя без блокировки? Вот простой пример того, что я пытаюсь сделать.
Пример:
class TestClass {
private PublishSubject ps = PublishSubject.create();
public TestClass() {
init();
}
public Observable<Foo> init() {
Observable<Foo> foo = getDataFromInputStream();
foo.subscribe(new Subscriber<Foo> () {
public void onNext(Foo foo) {
// Basically copy all values to a PublishSubject
// since we are dealing with a Java Input Stream
ps.onNext(foo);
}
public void onError(Throwable t) {
ps.onError(t)
}
// onComplete is never called since the stream lives on 'forever'
public void onComplete() {
}
});
}
public DataFetcher<CompletionStage<Foo>> vertxCallOne() {
return env -> doWork().to(SingleInterop.get());
}
public DataFetcher<CompletionStage<Bar>> vertxCallTwo() {
return env -> doOtherWork().to(SingleInterop.get());
}
public Observable<Foo> doWork() {
ps.filter(blahBlahBlah)
.subscribe(new Subscriber<Foo> () {
public void onNext(Foo foo) {
}
public void onError(Throwable t) {
}
public void onComplete() {
}
});
return ???
}
public Observable<Bar> doOtherWork() {
// Reuse the same PublishSubject as doWork(), but with
// different filter/mapping rules.
ps.filter(blah)
.filter(blah2)
.map()
.subscribe(new Subscriber<Bar> () {
public void onNext(Bar bar) {
}
public void onError(Throwable t) {
}
public void onComplete() {
}
});
return ???
}
Любая помощь будет принята с благодарностью. Спасибо!