Async Http Client + Netty - PullRequest
       23

Async Http Client + Netty

0 голосов
/ 14 октября 2018

Я сталкиваюсь с проблемами при тестировании асинхронного http-клиента (версия 2.4.3) и получаю следующее исключение при выполнении кода, вставленного ниже.Кроме того, к вашему сведению, я новичок в Netty и async-http-client, так что извините меня, если у меня возникнет пробел в моем понимании.

exception - io.netty.channel.ConnectTimeoutException: connection timed out:

Ниже приведены мои предположения -

  • В приведенном ниже коде будет один поток (единый цикл событий).
  • keepalive обеспечит повторное использование tcp-соединений
  • Размер tcp-соединений определяется setMaxConnections & setMaxConnectionsPerHost
  • ThrottleRequestFilter гарантирует, что запрос не будет выполнен, когда все соединенияактивно используются потоками, обрабатывающими код обработчика.

Вот мой вопрос -

1] Хотя я использую ThrottleRequestFilter, почему я должен видеть тайм-аут соединения.Я предполагаю, что как только ответ обработан обработчиком, соединение должно быть доступно для повторного использования, поскольку у меня есть набор keepalive, если я что-то упустил.

2] Каков размер пула потоков по умолчанию, которыйвыполняет код AsyncHandler.

Ниже приведен мой тестовый прогон

Total Requests - N = 1000
Max Con = 1000
Max Con/Host = 1000
pcit = 60000
ka = true

, а следующие показатели:

======================================
Total num of Reqs:  1000
Concurrency :   1000
Total Time in secs: 1.0
HTTP 200 OK:    789
HTTP 200 NOT OK:    211
Total Completed :   1000
rps : 789.0
======================================

public class HttpBm {

final Metrics metrics;
long startTime = System.nanoTime();

public HttpBm(Metrics metrics) {
    this.metrics = metrics;
}

private void run(int n, int mc, int mcph, int pcit, boolean ka, String url) {

    AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
            .addRequestFilter(new ThrottleRequestFilter(mcph))
            .setMaxConnections(mc)
            .setMaxConnectionsPerHost(mcph)
            .setKeepAlive(ka)
            .setConnectTimeout(1000)
            .setConnectionTtl(500));

    for (int r=0;r<n;r++) {

        final ListenableFuture<Response> whenResponse = asyncHttpClient.prepareGet(url).execute(new AsyncHandler<Response>() {

            private Integer status;

            public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
                if (200 == responseStatus.getStatusCode())
                    metrics.incrHttp200OK();
                else
                    metrics.incrHttpNon200OK();
                return State.ABORT;
            }

            public State onHeadersReceived(HttpHeaders headers) throws Exception {
                return State.ABORT;
            }

            public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
                return State.ABORT;
            }

            public void onThrowable(Throwable t) {
                metrics.incrHttpNon200OK();
                t.printStackTrace();
            }

            public Response onCompleted() throws Exception {
                return null;
            }
        });
    }

    long endTime = System.nanoTime();
    boolean done = true;

    while (done) {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        done = metrics.print(startTime, endTime);
    }

}

public static void main(String[] args) {

    int n = 1000;
    int mc = 1000;
    int mcph = 1000;
    int pcit = 60000;
    String url = "http://somehost:8080/index.html";

    if (args != null) {
        n = Integer.parseInt(args[0]);
        mc = Integer.parseInt(args[1]);
        mcph = Integer.parseInt(args[2]);
        pcit = Integer.parseInt(args[3]);
        url = args[4];
    }

    System.out.println();
    System.out.println("==================================");
    System.out.println("Url = " + url);
    System.out.println("Total Requests - N = " + n);
    System.out.println("Max Con = " + mc);
    System.out.println("Max Con/Host = " + mcph);
    System.out.println("pcit = " + pcit);
    System.out.println("ka = true");

    System.out.println("==================================");

    HttpBm httpBm = new HttpBm(new Metrics(n, mc));

    httpBm.run(n , mc, mcph, pcit, true, url);
}

}

import java.util.concurrent.atomic.AtomicInteger;

public class Metrics {

private int n;
private int c;

private AtomicInteger Http200OK = new AtomicInteger();
private AtomicInteger HttpNon200OK = new AtomicInteger();

public Metrics(int n, int c) {
    this.n = n;
    this.c = c;
}

public void incrHttp200OK() {
    Http200OK.incrementAndGet();
}

public void incrHttpNon200OK() {
    HttpNon200OK.incrementAndGet();
}

public boolean isComplete() {
    return Http200OK.get() + HttpNon200OK.get() >= n;
}

public boolean print(long startTime, long finishTime) {

    final float totalTimeSec = (finishTime - startTime) / 1000000000;
    final float rps = this.Http200OK.get() / totalTimeSec;

    System.out.println("======================================");
    System.out.println("Total num of Reqs:\t" + n);
    System.out.println("Concurrency :\t" + c);
    System.out.println("Total Time in secs:\t" +  totalTimeSec);
    System.out.println("HTTP 200 OK:\t" +  Http200OK.get());
    System.out.println("HTTP 200 NOT OK:\t" + HttpNon200OK.get());
    System.out.println("Total Completed :\t" + (Http200OK.get() + HttpNon200OK.get()));
    System.out.println("rps : " + rps);
    System.out.println("======================================");

    return isComplete();
}

}

...