Я разработал структуру rpc с именем tinywhale, которая используется для связи между поставщиками и потребителями. Ниже приведены данные для потребителя, чтобы вызвать поставщика:
Код ниже используется для создания прокси для вызова интерфейса провайдера:
public class xxxService {
@Autowired
private TinyWhaleProxy rpcProxy;
private HelloService helloService;
public String invoke(String message) throws InterruptedException {
if (helloService == null) {
helloService = rpcProxy.create(HelloService.class);
}
String result = helloService.hello(message);
return result;
}
}
Ниже приведен код метода TinyWhaleProxy.create для создания прокси для потребителя. Из кода видно, что я использую прокси-сервер cglib для создания экземпляра Proxy. Затем в методе обратного вызова я создаю новый NettyClient для отправки запроса на NettyServer для получения ответа на вызов .:
public <T> T create(Class<?> interfaceClass) {
Enhancer enhancer = new Enhancer();
enhancer.setCallback(new MethodInterceptor() {
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
NettyRequest request = new NettyRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameterValues(args);
NettyMessage nettyMessage = new NettyMessage();
nettyMessage.setType(MessageType.SERVICE_REQ.value());
nettyMessage.setBody(request);
if (TinyWhaleProxy.this.serviceDiscovery != null) {
TinyWhaleProxy.this.serverAddress = TinyWhaleProxy.this.serviceDiscovery.discover();
}
String[] array = TinyWhaleProxy.this.serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
NettyClient client = new NettyClient(host, port);
NettyMessage nettyResponse = client.send(nettyMessage);
return nettyResponse != null ? JSON.toJSONString(nettyResponse.getBody()) : null;
}
});
enhancer.setInterfaces(new Class[]{interfaceClass});
T cglibProxy = enhancer.create();
return cglibProxy;
}
Ниже приведен код NettyClient и его метод отправки, вы можете видеть, что я использовал ClientHandler для обработки отправки сообщений:
public class NettyClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private ClientHandler clientHandler = new ClientHandler();
private EventLoopGroup group = new NioEventLoopGroup();
private Bootstrap bootstrap = new Bootstrap();
private Channel clientChannel;
public NettyClient(String host, int port) throws InterruptedException {
((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(this.group)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, true)).handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 5, 12));
channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1048576, 4, 4));
channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler());
channel.pipeline().addLast("clientHandler", NettyClient.this.clientHandler);
channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler());
}
});
ChannelFuture channelFuture = this.bootstrap.connect(host, port);
channelFuture.addListener((future) -> {
if (future.isSuccess()) {
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接...");
this.clientChannel = channelFuture.channel();
} else {
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]连接失败,重新连接中...");
future.channel().close();
this.bootstrap.connect(host, port);
}
});
channelFuture.channel().closeFuture().addListener((cfl) -> {
this.close();
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");
});
}
private void close() {
if (this.clientChannel != null) {
this.clientChannel.close();
}
if (this.group != null) {
this.group.shutdownGracefully();
}
}
public NettyMessage send(NettyMessage message) throws InterruptedException, ExecutionException {
ChannelPromise promise = this.clientHandler.sendMessage(message);
promise.await(3L, TimeUnit.SECONDS);
return this.clientHandler.getResponse();
}
Ниже приведен код отправки сообщения из NettyClient в NettyServer:
public class ClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private ChannelHandlerContext ctx;
private ChannelPromise promise;
private NettyMessage response;
public ClientHandler() {
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.ctx = ctx;
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage message = (NettyMessage)msg;
if (message != null && message.getType() == MessageType.SERVICE_RESP.value()) {
this.response = message;
this.promise.setSuccess();
} else {
ctx.fireChannelRead(msg);
}
}
public synchronized ChannelPromise sendMessage(Object message) {
while(this.ctx == null) {
try {
TimeUnit.MILLISECONDS.sleep(1L);
logger.error("等待ChannelHandlerContext实例化");
} catch (InterruptedException var3) {
logger.error("等待ChannelHandlerContext实例化过程中出错", var3);
}
}
this.promise = this.ctx.newPromise();
this.ctx.writeAndFlush(message);
return this.promise;
}
public NettyMessage getResponse() {
return this.response;
}
}
Все работает, что хорошо для меня, когда я запускаю тест:
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
public class ProxyBenchmark {
private TinyWhaleProxy rpcProxy;
private ServiceDiscovery serviceDiscovery;
private HelloService helloService;
@Setup
public void init() {
serviceDiscovery = new ServiceDiscovery("127.0.0.1:2181");
rpcProxy = new TinyWhaleProxy();
rpcProxy.setServiceDiscovery(serviceDiscovery);
helloService = rpcProxy.create(HelloService.class);
}
@Benchmark
@GroupThreads(4)
public String test() {
String result = helloService.hello("sdfsdfsdf");
return result;
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(ProxyBenchmark.class.getSimpleName())
.forks(1)
.build();
new Runner(opt).run();
}
}
И результат теста:
Warmup Iteration 2: 121.743 ops/s
Warmup Iteration 3: 103.496 ops/s
Warmup Iteration 4: 125.844 ops/s
Warmup Iteration 5: 179.424 ops/s
Warmup Iteration 6: 120.635 ops/s
Warmup Iteration 7: 86.545 ops/s
Warmup Iteration 8: 184.926 ops/s
Warmup Iteration 9: 136.450 ops/s
Warmup Iteration 10: 169.146 ops/s
Warmup Iteration 11: 161.570 ops/s
Warmup Iteration 12: 157.327 ops/s
Warmup Iteration 13: 137.892 ops/s
Warmup Iteration 14: 120.632 ops/s
Warmup Iteration 15: 191.016 ops/s
Warmup Iteration 16: 183.634 ops/s
Warmup Iteration 17: 97.625 ops/s
Warmup Iteration 18: 145.066 ops/s
Warmup Iteration 19: 203.741 ops/s
Warmup Iteration 20: 111.741 ops/s
Iteration 1: 156.113 ops/s
Iteration 2: 203.618 ops/s
Iteration 3: 134.177 ops/s
Iteration 4: 121.404 ops/s
Iteration 5: 197.915 ops/s
Iteration 6: 199.298 ops/s
Iteration 7: 65.825 ops/s
Iteration 8: 184.359 ops/s
Iteration 9: 212.469 ops/s
Iteration 10: 139.439 ops/s
Iteration 11: 112.174 ops/s
Iteration 12: 199.004 ops/s
Iteration 13: 195.232 ops/s
Iteration 14: 90.157 ops/s
Iteration 15: 115.724 ops/s
Iteration 16: 191.738 ops/s
Iteration 17: 213.633 ops/s
Iteration 18: 159.058 ops/s
Iteration 19: 62.399 ops/s
Iteration 20: 165.608 ops/s
Result "com.tw.PerformanceBenchmark.test":
155.967 ±(99.9%) 42.118 ops/s [Average]
(min, avg, max) = (62.399, 155.967, 213.633), stdev = 48.503
CI (99.9%): [113.849, 198.085] (assumes normal distribution)
Производительность действительно плохая, всего 155 операций, вот во что я не могу поверить. Я проверил код и обнаружил, что приведенная ниже привязка кода заняла много времени:
NettyClient client = new NettyClient(host, port);
Какой-нибудь совет, как улучшить производительность? используя пул?
EDIT:
Поздно днем я проверил класс NettyClient и обнаружил, что, отправляя сообщение на серверную часть и получая ответ от сервера, этот процесс находится в режиме синхронизации, что приводит к снижению производительности. Затем я попытался исправить этот процесс, используя пул обещания / будущего, но не помог.
Затем я проверил другие структуры rpc и обнаружил, что все эти структуры обрабатывают этот сценарий в асинхронном режиме.
Так что я считаю, что мои исследования верны, отправка сообщения на сервер и получение ответа в асинхронном режиме. Но я не могу найти хорошее решение. Любой совет?