Как создать Reactor Netty горячий поток - PullRequest
0 голосов
/ 19 октября 2018

Я пытаюсь найти способ создать горячий поток, в который я мог бы вставить данные одним методом, а подписчик мог бы получить данные другим методом.Мне удалось использовать WorkQueueProcessor, но я не уверен, что это правильный способ сделать это.Можно ли сделать то же самое с помощью Flux.create?Вот мой рабочий фрагмент:

  1. Вызов connect();
  2. Отправка байтовых данных на сервер, клиент получит ответ от сервера tcp и workQueueProcessor отправитданные.

    @Component
    @RequiredArgsConstructor
    public class TcpCli {
    
        @Setter
        private Connection connection;
        private NettyOutbound out;
    
        //Creation of Work Queue Processor, can a Flux.create here can do the same job ?
        private WorkQueueProcessor<String> workQueueProcessor = WorkQueueProcessor.<String>builder().build();
        public Mono<? extends Connection> connect() {
            return TcpClient.create()
                .host(tcpConfig.getHost())
                .port(tcpConfig.getPort())
                .handle(this::handleConnection)
                .connect();
        }
        public Mono<String> sendData(ByteArray data) {
            out.sendByteArray(Mono.just(data)).then().subscribe();
    
            //Get emitted data from workQueueProcessor
            return workQueueProcessor.next();
        }
        private Publisher<Void> handleConnection(NettyInbound in, NettyOutbound out) {
            this.out = out;
            in.receive().asString()
                .log("In received")
                .subscribe(str -> {
                    LOGGER.info(String.format("Inbound: %s", str));
    
                    //Emit data to workQueueProcessor
                    workQueueProcessor.onNext(str);
                });
            return out
                .neverComplete()    //keep connection alive
                .log("Never close");
        }
    }
    

Ответы [ 2 ]

0 голосов
/ 15 марта 2019

Можете ли вы объяснить, что @Setter делает здесь.Я пробую один и тот же код, и он всегда выдает

@Setter
private Connection connection;
private NettyOutbound out;
0 голосов
/ 26 декабря 2018

Если у вас есть несколько источников ввода и более одного подписчика, я думаю, что вы на правильном пути.

Если NettyInbound является единственным источником ввода для вашей пары, то вам не нужно использовать какие-либоПроцессоры.Просто подпишитесь на него.

Если у вас есть несколько источников входного сигнала для вашего потока, и у вас есть только один подписчик, в вашем случае NettyOutbound, вы можете попробовать 'UnicastProcessor', который очень легкий.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...