Нетти с многопоточностью - PullRequest
0 голосов
/ 12 мая 2019

Если метод executeTestCommand выполняется одновременно в многопоточной среде, каково поведение TestResponseHandler и executeTestCommand?Даст ли он правильный commandOutput для указанной команды testCommand?

Ниже представлен клиент Netty Test

public class TestClient {

private EventLoopGroup group;
private BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
private Channel channel;
private int commandTimeout =10000;

//below method executes one time 

private Channel createConnection(BlockingQueue<String> queue1){

    try {
        group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group1)
                .channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(ip1, port1))
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new TestResponseHandler(queue1));
                        }});
        channel = b.connect().sync().channel();
    } catch(Exception e) {
    }
    log.info(method, "Channel created : ");
}

//Below command can execute multiple times

public String executeTestCommand(String testCommand) {
ChannelFuture lastWriteFuture = null;
String commandOutput = null;
        try {

            lastWriteFuture = channel.writeAndFlush(testCommand + "\r\n");
            commandOutput = queue1.poll(commandTimeout, TimeUnit.MILLISECONDS);
            if (lastWriteFuture != null) {
                lastWriteFuture.sync();
            }
        } catch(Exception e) {
        }
        return commandOutput;
}

Ниже приведен другой класс, который помещает ответ от удаленного узла в BlockingQueue.

public class TestResponseHandler extends  ChannelInboundHandlerAdapter {

    private BlockingQueue<String> queue;

        public TestResponseHandler(BlockingQueue<String> queue) {
        this.queue=queue;
    }

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {

        try {
            queue.put((String)msg);
        } catch (Exception e) { 
        }
        }   
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...