каждое тело, привет!
Я использую netty 3.1 для построения сервера диспетчеризации сокетов, который передает данные сокетов на другой сервер сокетов, поэтому я создаю клиентское соединение в обработчике netty sever, когда первое сообщение прибыло, и ожидаю, когда соединение завершено, когда приходит следующее событие messageRecv, Я просто перенести буфер с канала сервера на канал клиента. Но я считаю, что это запрещено в обработчике при использовании операции future.await * (). Если я не использую await (), потому что connectFuture синхронизируется, есть вероятность, что при поступлении следующего сообщения, но соединение не будет завершено. Я не знаю, как решить проблему.
Как я могу убедиться, что соединение с клиентом завершено до наступления следующего события messageRecv?
Прямо сейчас я просто блокирую синхронизацию двух кодов, вот так:
/**
* server handler
*/
public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
private static Logger _logger = LoggerFactory.getLogger(cn.szboc.dispatch.server.netty.ServerChannelHandler.class);
public ServerChannelHandler(ProxyClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
/** factory connect another server */
private ProxyClientFactory clientFactory;
/** anotherchannel */
private Channel innerChannel;
private ChannelFuture connectFuture;
private ReentrantLock connectLock = new ReentrantLock();
private Condition notComplete = connectLock.newCondition();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final ChannelBuffer buffer = ((ChannelBuffer) e.getMessage()).copy();
final Channel outChannel = ctx.getChannel();
// first connect
if (connectFuture == null) {
final ClientChannelHandler cch = new ClientChannelHandler(ctx.getChannel());
ProxyClient client = clientFactory.retrieveClient();
connectFuture = client.getConnectChannelFuture();
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
connectLock.lock();
try {
if (future.isSuccess()) {
innerChannel = future.getChannel();
innerChannel.getPipeline().addLast("clientchannelhandler", cch);
innerChannel.write(buffer);
} else {
Channels.fireExceptionCaught(outChannel, future.getCause());
}
} finally {
notComplete.signal();
connectLock.unlock();
}
}
});
} else {
connectLock.lock();
try {
if (!connectFuture.isDone()) {
if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {
throw new Exception("");
}
}
if (connectFuture.isSuccess()) {
if(innerChannel == null){
if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {
throw new Exception("");
}
}
innerChannel.write(buffer);
} else {
_logger.error("");
}
} finally {
connectLock.unlock();
}
}
}