Одним из способов является использование некоторых процессоров, таких как DirectProcessor, вы можете создать 2 разных процессора и при событии генерировать элемент на процессор и подписать процессор, но если вы все еще хотите использовать Flux.create, вы можете сделать это следующим образом
Flux<Object> objectFlux;
@Override
public void run(String... args) throws Exception {
objectFlux = Flux.create(objectFluxSink ->
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
objectFluxSink.next(state);
}
@Override
public void onResultData(Result result) {
objectFluxSink.next(state);
}
}));
}
public Flux<Result> getResult(){
return objectFlux.filter(o -> o instanceof Result)
.map(o -> ((Result)o));
}
public Flux<State> geState(){
return objectFlux.filter(o -> o instanceof State)
.map(o -> ((State)o));
}
Я все еще думаю, что использование процессора должно быть намного чище, и вам не нужно делать этот фильтр и приведение, но вам нужно иметь 2 процессора, как это:
DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();
DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
stateDirectProcessor.onNext(state);
}
@Override
public void onResultData(Result result) {
resultDirectProcessor.onNext(result);
}
});