низкая производительность rpcProxy на основе netty - PullRequest
1 голос
/ 16 апреля 2019

Я разработал структуру rpc с именем tinywhale, которая используется для связи между поставщиками и потребителями. Ниже приведены данные для потребителя, чтобы вызвать поставщика:

Код ниже используется для создания прокси для вызова интерфейса провайдера:

public class xxxService {

@Autowired
private TinyWhaleProxy rpcProxy;

private HelloService helloService;

public String invoke(String message) throws InterruptedException {
    if (helloService == null) {
        helloService = rpcProxy.create(HelloService.class);
    }
    String result = helloService.hello(message);
    return result;
}
}

Ниже приведен код метода TinyWhaleProxy.create для создания прокси для потребителя. Из кода видно, что я использую прокси-сервер cglib для создания экземпляра Proxy. Затем в методе обратного вызова я создаю новый NettyClient для отправки запроса на NettyServer для получения ответа на вызов .:

 public <T> T create(Class<?> interfaceClass) {
    Enhancer enhancer = new Enhancer();
    enhancer.setCallback(new MethodInterceptor() {
        public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
            NettyRequest request = new NettyRequest();
            request.setRequestId(UUID.randomUUID().toString());
            request.setClassName(method.getDeclaringClass().getName());
            request.setMethodName(method.getName());
            request.setParameterTypes(method.getParameterTypes());
            request.setParameterValues(args);
            NettyMessage nettyMessage = new NettyMessage();
            nettyMessage.setType(MessageType.SERVICE_REQ.value());
            nettyMessage.setBody(request);
            if (TinyWhaleProxy.this.serviceDiscovery != null) {
                TinyWhaleProxy.this.serverAddress = TinyWhaleProxy.this.serviceDiscovery.discover();
            }

            String[] array = TinyWhaleProxy.this.serverAddress.split(":");
            String host = array[0];
            int port = Integer.parseInt(array[1]);
            NettyClient client = new NettyClient(host, port);
            NettyMessage nettyResponse = client.send(nettyMessage);
            return nettyResponse != null ? JSON.toJSONString(nettyResponse.getBody()) : null;
        }
    });
    enhancer.setInterfaces(new Class[]{interfaceClass});
    T cglibProxy = enhancer.create();
    return cglibProxy;
}

Ниже приведен код NettyClient и его метод отправки, вы можете видеть, что я использовал ClientHandler для обработки отправки сообщений:

public class NettyClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private ClientHandler clientHandler = new ClientHandler();
private EventLoopGroup group = new NioEventLoopGroup();
private Bootstrap bootstrap = new Bootstrap();
private Channel clientChannel;

public NettyClient(String host, int port) throws InterruptedException {
    ((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(this.group)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, true)).handler(new ChannelInitializer<SocketChannel>() {
        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 5, 12));
            channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1048576, 4, 4));
            channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
            channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler());
            channel.pipeline().addLast("clientHandler", NettyClient.this.clientHandler);
            channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler());
        }
    });
    ChannelFuture channelFuture = this.bootstrap.connect(host, port);
    channelFuture.addListener((future) -> {
        if (future.isSuccess()) {
            logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接...");
            this.clientChannel = channelFuture.channel();
        } else {
            logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]连接失败,重新连接中...");
            future.channel().close();
            this.bootstrap.connect(host, port);
        }

    });
    channelFuture.channel().closeFuture().addListener((cfl) -> {
        this.close();
        logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");
    });
}

private void close() {
    if (this.clientChannel != null) {
        this.clientChannel.close();
    }

    if (this.group != null) {
        this.group.shutdownGracefully();
    }

}

public NettyMessage send(NettyMessage message) throws InterruptedException, ExecutionException {
    ChannelPromise promise = this.clientHandler.sendMessage(message);
    promise.await(3L, TimeUnit.SECONDS);
    return this.clientHandler.getResponse();
}

Ниже приведен код отправки сообщения из NettyClient в NettyServer:

public class ClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private ChannelHandlerContext ctx;
private ChannelPromise promise;
private NettyMessage response;

public ClientHandler() {
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    this.ctx = ctx;
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyMessage message = (NettyMessage)msg;
    if (message != null && message.getType() == MessageType.SERVICE_RESP.value()) {
        this.response = message;
        this.promise.setSuccess();
    } else {
        ctx.fireChannelRead(msg);
    }

}

public synchronized ChannelPromise sendMessage(Object message) {
    while(this.ctx == null) {
        try {
            TimeUnit.MILLISECONDS.sleep(1L);
            logger.error("等待ChannelHandlerContext实例化");
        } catch (InterruptedException var3) {
            logger.error("等待ChannelHandlerContext实例化过程中出错", var3);
        }
    }

    this.promise = this.ctx.newPromise();
    this.ctx.writeAndFlush(message);
    return this.promise;
}

public NettyMessage getResponse() {
    return this.response;
}
}

Все работает, что хорошо для меня, когда я запускаю тест:

@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
public class ProxyBenchmark {
private TinyWhaleProxy rpcProxy;

private ServiceDiscovery serviceDiscovery;


private HelloService helloService;

@Setup
public void init() {

    serviceDiscovery = new ServiceDiscovery("127.0.0.1:2181");

    rpcProxy = new TinyWhaleProxy();
    rpcProxy.setServiceDiscovery(serviceDiscovery);

    helloService = rpcProxy.create(HelloService.class);
}

@Benchmark
@GroupThreads(4)
public String test() {
    String result = helloService.hello("sdfsdfsdf");
    return result;
}

public static void main(String[] args) throws RunnerException {
    Options opt = new OptionsBuilder()
            .include(ProxyBenchmark.class.getSimpleName())
            .forks(1)
            .build();
    new Runner(opt).run();
}
}

И результат теста:

Warmup Iteration   2: 121.743 ops/s
Warmup Iteration   3: 103.496 ops/s
Warmup Iteration   4: 125.844 ops/s
Warmup Iteration   5: 179.424 ops/s
Warmup Iteration   6: 120.635 ops/s
Warmup Iteration   7: 86.545 ops/s
Warmup Iteration   8: 184.926 ops/s
Warmup Iteration   9: 136.450 ops/s
Warmup Iteration  10: 169.146 ops/s
Warmup Iteration  11: 161.570 ops/s
Warmup Iteration  12: 157.327 ops/s
Warmup Iteration  13: 137.892 ops/s
Warmup Iteration  14: 120.632 ops/s
Warmup Iteration  15: 191.016 ops/s
Warmup Iteration  16: 183.634 ops/s
Warmup Iteration  17: 97.625 ops/s
Warmup Iteration  18: 145.066 ops/s
Warmup Iteration  19: 203.741 ops/s
Warmup Iteration  20: 111.741 ops/s
Iteration   1: 156.113 ops/s
Iteration   2: 203.618 ops/s
Iteration   3: 134.177 ops/s
Iteration   4: 121.404 ops/s
Iteration   5: 197.915 ops/s
Iteration   6: 199.298 ops/s
Iteration   7: 65.825 ops/s
Iteration   8: 184.359 ops/s
Iteration   9: 212.469 ops/s
Iteration  10: 139.439 ops/s
Iteration  11: 112.174 ops/s
Iteration  12: 199.004 ops/s
Iteration  13: 195.232 ops/s
Iteration  14: 90.157 ops/s
Iteration  15: 115.724 ops/s
Iteration  16: 191.738 ops/s
Iteration  17: 213.633 ops/s
Iteration  18: 159.058 ops/s
Iteration  19: 62.399 ops/s
Iteration  20: 165.608 ops/s
Result "com.tw.PerformanceBenchmark.test":
155.967 ±(99.9%) 42.118 ops/s [Average]
(min, avg, max) = (62.399, 155.967, 213.633), stdev = 48.503
CI (99.9%): [113.849, 198.085] (assumes normal distribution)

Производительность действительно плохая, всего 155 операций, вот во что я не могу поверить. Я проверил код и обнаружил, что приведенная ниже привязка кода заняла много времени:

NettyClient client = new NettyClient(host, port);

Какой-нибудь совет, как улучшить производительность? используя пул?

EDIT:

Поздно днем ​​я проверил класс NettyClient и обнаружил, что, отправляя сообщение на серверную часть и получая ответ от сервера, этот процесс находится в режиме синхронизации, что приводит к снижению производительности. Затем я попытался исправить этот процесс, используя пул обещания / будущего, но не помог.

Затем я проверил другие структуры rpc и обнаружил, что все эти структуры обрабатывают этот сценарий в асинхронном режиме.

Так что я считаю, что мои исследования верны, отправка сообщения на сервер и получение ответа в асинхронном режиме. Но я не могу найти хорошее решение. Любой совет?

...