netty: почему код в обработчике не может использовать future.await ()? - PullRequest
1 голос
/ 19 марта 2012

каждое тело, привет!

Я использую netty 3.1 для построения сервера диспетчеризации сокетов, который передает данные сокетов на другой сервер сокетов, поэтому я создаю клиентское соединение в обработчике netty sever, когда первое сообщение прибыло, и ожидаю, когда соединение завершено, когда приходит следующее событие messageRecv, Я просто перенести буфер с канала сервера на канал клиента. Но я считаю, что это запрещено в обработчике при использовании операции future.await * (). Если я не использую await (), потому что connectFuture синхронизируется, есть вероятность, что при поступлении следующего сообщения, но соединение не будет завершено. Я не знаю, как решить проблему.

Как я могу убедиться, что соединение с клиентом завершено до наступления следующего события messageRecv?

Прямо сейчас я просто блокирую синхронизацию двух кодов, вот так:

/**
 * server handler
*/
public class ServerChannelHandler extends SimpleChannelUpstreamHandler {

  private static Logger _logger = LoggerFactory.getLogger(cn.szboc.dispatch.server.netty.ServerChannelHandler.class);

  public ServerChannelHandler(ProxyClientFactory clientFactory) {
    this.clientFactory = clientFactory;
  }

/** factory connect another server */
private ProxyClientFactory clientFactory;

/** anotherchannel */
private Channel innerChannel;

private ChannelFuture connectFuture;


private ReentrantLock connectLock = new ReentrantLock();

private Condition notComplete = connectLock.newCondition();

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

    final ChannelBuffer buffer = ((ChannelBuffer) e.getMessage()).copy();
    final Channel outChannel = ctx.getChannel();

    // first connect
    if (connectFuture == null) {

        final ClientChannelHandler cch = new ClientChannelHandler(ctx.getChannel());

        ProxyClient client = clientFactory.retrieveClient();


        connectFuture = client.getConnectChannelFuture();


        connectFuture.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {

                connectLock.lock();
                try {

                    if (future.isSuccess()) {
                        innerChannel = future.getChannel();

                        innerChannel.getPipeline().addLast("clientchannelhandler", cch);
                        innerChannel.write(buffer);
                    } else {

                        Channels.fireExceptionCaught(outChannel, future.getCause());
                    }
                } finally {

                    notComplete.signal();

                    connectLock.unlock();
                }
            }

        });

    } else {

        connectLock.lock();
        try {

            if (!connectFuture.isDone()) {

                if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {

                    throw new Exception("");
                }
            }


            if (connectFuture.isSuccess()) {
                if(innerChannel == null){
                    if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {

                        throw new Exception("");
                    }
                }
                innerChannel.write(buffer);
            } else {

                _logger.error("");
            }

        } finally {
            connectLock.unlock();
        }

    }

}

1 Ответ

1 голос
/ 20 марта 2012

Вы не можете, потому что вы можете завести тупик нетто. Вы также заблокируете поток IO-Worker, что плохо. Лучший способ справиться с ситуацией - поставить сообщения в очередь до завершения соединения и затем отправить их.

Другим решением было бы подключиться к прокси-клиенту по методу «channelOpen (..)», в то время как ранее было установлено значение Channel.setReadable (false). После того, как соединение выполнено, вы должны снова вызвать Channel.setReadable (true), чтобы обработать messageEvents.

Примерно так:

    @Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
        throws Exception {
    // Suspend incoming traffic until connected to the remote host.
    final Channel inboundChannel = e.getChannel();
    inboundChannel.setReadable(false);

    // Start the connection attempt.
    ClientBootstrap cb = new ClientBootstrap(cf);
    cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
    ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));

    outboundChannel = f.getChannel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                // Connection attempt succeeded:
                // Begin to accept incoming traffic.
                inboundChannel.setReadable(true);
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}

См. Пример прокси для получения дополнительной информации [1].

[1] https://github.com/netty/netty/tree/3.2/src/main/java/org/jboss/netty/example/proxy

...