СЦЕНА :
Я пишу эхо-клиент и сервер. Передаваемые данные - это строка:
Клиент кодирует строку и отправляет ее на сервер.
Сервер извлекает данные, декодирует строку, затем кодирует полученную строку и отправляет ее обратно клиенту.
Вышеописанный процесс будет повторяться 100000 раз. (Примечание: соединение постоянное).
РАЗЛИЧНЫЕ УСЛОВИЯ :
Когда я одновременно запускаю ОДИН сервер и ДВА клиента, все в порядке, каждый клиент получает 100000 сообщений и нормально завершает свою работу.
Но когда я добавляю ExecutionHandler на сервер , а затем запускаю ОДИН сервер и ДВА клиента одновременно, один клиент никогда не завершит работу, и сетевой трафик будет равен нулю.
Пока я не могу определить ключевой момент этой проблемы, не могли бы вы дать мне несколько советов?
МОЙ КОД :
кодировщик строк, декодер строк, обработчик клиента, обработчик сервера, основной клиент, главный сервер.
// Decoder ============================================ ===========
import java.nio.charset.Charset;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
public class Dcd extends FrameDecoder {
public static final Charset cs = Charset.forName("utf8");
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 4) {
return null;
}
int headlen = 4;
int length = buffer.getInt(0);
if (buffer.readableBytes() < length + headlen) {
return null;
}
String ret = buffer.toString(headlen, length, cs);
buffer.skipBytes(length + headlen);
return ret;
}
}
// Кодировщик ================================================= ===========
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
public class Ecd extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof String)) {
return msg;
}
byte[] data = ((String) msg).getBytes();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(data.length + 4, ctx
.getChannel().getConfig().getBufferFactory());
buf.writeInt(data.length);
buf.writeBytes(data);
return buf;
}
}
// Клиентский обработчик ========================================== ============
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public class EchoClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger
.getLogger(EchoClientHandler.class.getName());
private final AtomicLong transferredBytes = new AtomicLong();
private final AtomicInteger counter = new AtomicInteger(0);
private final AtomicLong startTime = new AtomicLong(0);
private String dd;
/**
* Creates a client-side handler.
*/
public EchoClientHandler(String data) {
dd = data;
}
public long getTransferredBytes() {
return transferredBytes.get();
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
// Send the first message. Server will not send anything here
// because the firstMessage's capacity is 0.
startTime.set(System.currentTimeMillis());
Channels.write(ctx.getChannel(), dd);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
transferredBytes.addAndGet(((String) e.getMessage()).length());
int i = counter.incrementAndGet();
int N = 100000;
if (i < N) {
e.getChannel().write(e.getMessage());
} else {
ctx.getChannel().close();
System.out.println(N * 1.0
/ (System.currentTimeMillis() - startTime.get()) * 1000);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
// Клиент main ================================================ ============
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
/**
* Sends one message when a connection is open and echoes back any received data
* to the server. Simply put, the echo client initiates the ping-pong traffic
* between the echo client and server by sending the first message to the
* server.
*/
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() {
// Configure the client.
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new Dcd(), new Ecd(),
new EchoClientHandler("abcdd"));
}
});
bootstrap.setOption("sendBufferSize", 1048576);
bootstrap.setOption("receiveBufferSize", 1048576);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("writeBufferLowWaterMark", 32 * 1024);
bootstrap.setOption("writeBufferHighWaterMark", 64 * 1024);
List<ChannelFuture> list = new ArrayList<ChannelFuture>();
for (int i = 0; i < 1; i++) {
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(
host, port));
// Wait until the connection is closed or the connection
// attempt
// fails.
list.add(future);
}
for (ChannelFuture f : list) {
f.getChannel().getCloseFuture().awaitUninterruptibly();
}
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
}
private static void testOne() {
final String host = "192.168.0.102";
final int port = 8000;
new EchoClient(host, port).run();
}
public static void main(String[] args) throws Exception {
testOne();
}
}
// обработчик сервера ========================================== ============
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handler implementation for the echo server.
*/
public class EchoServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger
.getLogger(EchoServerHandler.class.getName());
private final AtomicLong transferredBytes = new AtomicLong();
public long getTransferredBytes() {
return transferredBytes.get();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
transferredBytes.addAndGet(((String) e.getMessage()).length());
Channels.write(ctx.getChannel(), e.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
// Главный сервер ========================================== ============
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
/**
* Echoes back any received data from a client.
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
System.out.println(Runtime.getRuntime().availableProcessors() * 2);
final ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
System.out.println("new pipe");
return Channels.pipeline(new Dcd(), new Ecd(),
executionHandler, new EchoServerHandler());
}
});
bootstrap.setOption("child.sendBufferSize", 1048576);
bootstrap.setOption("child.receiveBufferSize", 1048576);
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.writeBufferLowWaterMark", 32 * 1024);
bootstrap.setOption("child.writeBufferHighWaterMark", 64 * 1024);
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
}
public static void main(String[] args) throws Exception {
int port = 8000;
new EchoServer(port).run();
}
}