Как правильно реализовать реактивные веб-сокеты? - PullRequest
0 голосов
/ 24 апреля 2020

Я пытаюсь сделать двоичное эхо, которое будет транслироваться всем клиентам:

public class EchoHandler implements WebSocketHandler {

    private UnicastProcessor<DataBuffer> binaryEventPublisher;
    private Flux<DataBuffer> binaryOutputEvents;

    public EchoHandler(UnicastProcessor<DataBuffer> eventPublisher, Flux<DataBuffer> events) {
        this.binaryEventPublisher = eventPublisher;
        this.binaryOutputEvents = Flux.from(events);
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        BinaryWebsocketMessageSubscriber subscriber = new BinaryWebsocketMessageSubscriber(binaryEventPublisher);
        session.receive()
                .map(WebSocketMessage::getPayload)
                .subscribe(subscriber::onNext, subscriber::onError, subscriber::onComplete);
        return session.send(binaryOutputEvents.map(session::binaryMessage)); // map in Flux cannot be applied to error
    }

}

BinaryWebsocketMessageSubscriber определено здесь . EchoApplication определяется здесь .

Как карту в Flux нельзя применить к ошибке времени компиляции , которую можно исправить здесь?

...