Блокировка Tcp-запроса с помощью Netty - PullRequest
1 голос
/ 07 апреля 2019

Я новичок в нетти, и мой английский не является родным, в первую очередь извините за эту ситуацию, я постараюсь объяснить как можно больше.

У меня есть приложение для подключения к серверу netty и отправки запроса tcp. Мы должны использовать этот сервер для блокирующих и неблокирующих операций, например;

1- Некоторые запросы будут выполняться асинхронно. Я отправлю запрос на сервер и не буду ждать ответа. В этом случае нет проблем

2- Второй случай синхронизации. Когда я отправляю запрос с идентификатором транзакции на сервер, мне приходится ждать ответа с тем же идентификатором транзакции. Я не нашел четкого примера блокировки запроса netty. Все они об обещании-будущем, и это реализовано как следующие строки, но это не потокобезопасно. Когда я использую с ~ 40tps, некоторые запросы не будут собраны с ответными сообщениями, некоторые сообщения экранируются. Например, запрошено 2400 сообщений за 1 мин. 2000 сообщений в порядке, но более или менее 400 сообщений сбежали. Я видел это 400 сообщение, возвращенное с сервера, но метод get будущего не получил его и получил время ожидания.

Конфигурация Netty Client;

Bootstrap clientBootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 50)
                    .remoteAddress(new InetSocketAddress(host.getIp(), host.getPort()))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) {
                            socketChannel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2146));
                            ClientHandler clientHandler = new clientHandler(host);
                            socketChannel.pipeline().addLast(clientHandler);
                        }
                    });

            channelFuture = clientBootstrap.connect().sync(); 
            channelFuture.channel().closeFuture().sync();

Канал будущего слушателя;

channelFuture.addListener((GenericFutureListener<ChannelFuture>) future -> {
              channelFuture.channel().pipeline().get(ClientHandler.class).setResponseFuture(responseFuture);
                byte[] byteBuffer = message.getBytes();
                channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(byteBuffer));
            });

ResponseFuture - это пользовательское будущее для получения ответа сервера. InboundHandler, как это;

@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) {
        String input = in.toString(CharsetUtil.UTF_8);
        String transactionKey = input.substring(52, 66).trim();
        if(responseFuture.get(Long.valueOf(transactionKey))!=null)
            responseFuture.get(Long.valueOf(transactionKey)).set(input);
        else
            log.info("[{}] Tcp Response map is empty",transactionKey);
        }

И, наконец, я получаю такое обещание на будущее;

Object obj = responseFuture.get(ddaTimeoutParam, TimeUnit.MILLISECONDS);
      if(obj!=null) {
          response = obj.toString();
          ddaDelta = System.currentTimeMillis()-ddaRequestStartTime;
      }

Иногда некоторые параллельные потоки работают, когда эта строка responseFuture.get(Long.valueOf(transactionKey)).set(input) работает до responseFuture.get и входы смешивают каждый идентификатор транзакции. Как я могу решить эту проблему? Могу ли я использовать другие хорошие методы Netty для отправки события, когда ответ получен.

...