количество диспетчеров ввода / вывода в apache http async client - PullRequest
0 голосов
/ 01 мая 2018

Я использую асинхронный клиент Apache HTTP для выполнения http-запросов (используя данные из шины Kafka) и обработки ответа в обратных вызовах.

Я нашел несколько интересных способов поведения обратных вызовов в своем журнале и хотел бы выяснить, почему это так.

Вот часть кода (использует данные из kafka и делает запрос http):

    while (true) {
        ConsumerRecords<String, MyMessage> records = kafkaConsumer.poll(1000);
        if (records.isEmpty()) {
            logger.info("Polling Empty ....");
            continue;
        }

        int numberOfRecords = records.count();
        final CountDownLatch batchLatch = new CountDownLatch(numberOfRecords);
        for (ConsumerRecord<String, MyMessage> record: records) {
            final HttpPost request = new HttpPost(endpoint);
            // set headers
            // set entity
            request.setEntity(messageToEntity(record.value()));
            httpClient.execute(request, someCallback);
        }

        batchLatch.await();
        kafkaConsumer.commitAsync();
    }

А вот пример кода обратного вызова:

public class SomeCallback {
    // ...
    @Override
    public void completed(final HttpResponse response) {
        // do something
        logger.info("{} - blablabla", status, ...);
        latch.countDown();
    }

    @Override
    public void failed(final Exception ex) {
        // do something
        logger.error("{} - blablabla", FAILED, ..., ex);
        latch.countDown();
    }

    @Override
    public void cancelled() {
        // do something
        logger.error("{} - blablabla!", CANCELLED, ...);
        latch.countDown();
    }
}

Итак, в какой-то момент я запустил программу, и она начинает потреблять данные из kafka. Максимальное количество записей на опрос составляет 500.

И поскольку на Kafka уже есть много неиспользованных данных, программа столкнулась с высокой пропускной способностью.

Теперь журнал выглядит так:

[INFO ] 2018-04-29 04:11:29.234 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 04:11:30.362 [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
...
...
[INFO ] 2018-04-29 06:28:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:28:35.363 [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
...
[INFO ] 2018-04-29 06:31:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:32:12.418 [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
...
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
// there the program stopped

Вот что я не могу понять:

  1. Почему на каждые 500 запросов (обработка ответов) есть только два диспетчера ввода-вывода? Это из-за настройки по умолчанию max 2?
  2. Почему число диспетчеров ввода / вывода продолжает увеличиваться на каждые 500 запросов? Обычно, что я испытал раньше, так это то, что число диспетчеров ввода / вывода будет увеличиваться, но также будет уменьшаться до 1 и 2. Я предполагаю, что: он будет использовать некоторые из предыдущих диспетчеров ввода / вывода вместо каждого время создания новых.
  3. Почему ConnectionClosedException в конце? Это потому, что слишком много диспетчеров ввода-вывода остановили программу?

Обновление

Спасибо за комментарии Олега, я узнал, что число диспетчеров ввода-вывода постоянно увеличивается, потому что каждый раз, когда я получаю данные от Kafka, создается новый http-клиент. Тогда будет много бездействующих клиентов, занимающих операции ввода-вывода и ресурсы. По этой же причине программа остановилась.

У меня еще остались вопросы:

  1. Что означает число диспетчера ввода-вывода, показанное в журналах? Это количество разных тем?
  2. Как я могу контролировать максимальное количество диспетчеров ввода-вывода для одного http-клиента? это диспетчер ввода / вывода?
  3. В чем разница между количеством диспетчеров ввода-вывода и количеством соединений?
  4. Как оценить количество диспетчеров ввода-вывода и количество подключений, которые мне понадобятся, исходя из машины, на которой я запускаю программу, и пропускной способности / размера данных?

1 Ответ

0 голосов
/ 02 мая 2018
  1. Представляет имя потока.

  2. Свойства и поведение реактора ввода / вывода можно контролировать с помощью IOReactorConfig

  3. Небольшое количество диспетчеров ввода-вывода управляет гораздо большим числом соединений.

  4. Скорее всего, один поток диспетчеризации ввода-вывода на ядро ​​ЦП является разумным значением по умолчанию для подавляющего большинства приложений, и его не следует изменять.

...