Как создать несколько Flux из обратных вызовов aysnc - PullRequest
0 голосов
/ 28 января 2019

Из справочного руководства Reactor я узнал, что Flux.create() можно использовать для преобразования обратного вызова aysnc в Flux.

Однако иногда обратные вызовы имеют несколько методов для получения данных нескольких типов, предположим, у меня есть peiceкода, как показано ниже:

asrService.recognize(new Callback() {
    @Override
    public void stateChange(State state) {
        // consume state
    }

    @Override
    public void onResultData(Result result) {
        // consume result
    }
});

Как преобразовать его в два реактивных потока: Flux<State> и Flux<Result>?

1 Ответ

0 голосов
/ 28 января 2019

Одним из способов является использование некоторых процессоров, таких как DirectProcessor, вы можете создать 2 разных процессора и при событии генерировать элемент на процессор и подписать процессор, но если вы все еще хотите использовать Flux.create, вы можете сделать это следующим образом

    Flux<Object> objectFlux;

@Override
public void run(String... args) throws Exception {

    objectFlux = Flux.create(objectFluxSink ->
            asrService.recognize(new Callback() {
                @Override
                public void stateChange(State state) {
                    objectFluxSink.next(state);
                }

                @Override
                public void onResultData(Result result) {
                    objectFluxSink.next(state);
                }
            }));





}

public Flux<Result> getResult(){
 return    objectFlux.filter(o -> o instanceof Result)
            .map(o -> ((Result)o));
}

public Flux<State> geState(){
    return    objectFlux.filter(o -> o instanceof State)
            .map(o -> ((State)o));
}

Я все еще думаю, что использование процессора должно быть намного чище, и вам не нужно делать этот фильтр и приведение, но вам нужно иметь 2 процессора, как это:

        DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();
    DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();
    asrService.recognize(new Callback() {
        @Override
        public void stateChange(State state) {
            stateDirectProcessor.onNext(state);
        }

        @Override
        public void onResultData(Result result) {
            resultDirectProcessor.onNext(result);
        }
    });
...