Я использую асинхронный клиент Apache HTTP для выполнения http-запросов (используя данные из шины Kafka) и обработки ответа в обратных вызовах.
Я нашел несколько интересных способов поведения обратных вызовов в своем журнале и хотел бы выяснить, почему это так.
Вот часть кода (использует данные из kafka и делает запрос http):
while (true) {
ConsumerRecords<String, MyMessage> records = kafkaConsumer.poll(1000);
if (records.isEmpty()) {
logger.info("Polling Empty ....");
continue;
}
int numberOfRecords = records.count();
final CountDownLatch batchLatch = new CountDownLatch(numberOfRecords);
for (ConsumerRecord<String, MyMessage> record: records) {
final HttpPost request = new HttpPost(endpoint);
// set headers
// set entity
request.setEntity(messageToEntity(record.value()));
httpClient.execute(request, someCallback);
}
batchLatch.await();
kafkaConsumer.commitAsync();
}
А вот пример кода обратного вызова:
public class SomeCallback {
// ...
@Override
public void completed(final HttpResponse response) {
// do something
logger.info("{} - blablabla", status, ...);
latch.countDown();
}
@Override
public void failed(final Exception ex) {
// do something
logger.error("{} - blablabla", FAILED, ..., ex);
latch.countDown();
}
@Override
public void cancelled() {
// do something
logger.error("{} - blablabla!", CANCELLED, ...);
latch.countDown();
}
}
Итак, в какой-то момент я запустил программу, и она начинает потреблять данные из kafka. Максимальное количество записей на опрос составляет 500.
И поскольку на Kafka уже есть много неиспользованных данных, программа столкнулась с высокой пропускной способностью.
Теперь журнал выглядит так:
[INFO ] 2018-04-29 04:11:29.234 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 04:11:30.362 [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
...
...
[INFO ] 2018-04-29 06:28:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:28:35.363 [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
...
[INFO ] 2018-04-29 06:31:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:32:12.418 [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
...
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
// there the program stopped
Вот что я не могу понять:
- Почему на каждые 500 запросов (обработка ответов) есть только два диспетчера ввода-вывода? Это из-за настройки по умолчанию max 2?
- Почему число диспетчеров ввода / вывода продолжает увеличиваться на каждые 500 запросов? Обычно, что я испытал раньше, так это то, что число диспетчеров ввода / вывода будет увеличиваться, но также будет уменьшаться до 1 и 2. Я предполагаю, что: он будет использовать некоторые из предыдущих диспетчеров ввода / вывода вместо каждого время создания новых.
- Почему ConnectionClosedException в конце? Это потому, что слишком много диспетчеров ввода-вывода остановили программу?
Обновление
Спасибо за комментарии Олега, я узнал, что число диспетчеров ввода-вывода постоянно увеличивается, потому что каждый раз, когда я получаю данные от Kafka, создается новый http-клиент. Тогда будет много бездействующих клиентов, занимающих операции ввода-вывода и ресурсы. По этой же причине программа остановилась.
У меня еще остались вопросы:
- Что означает число диспетчера ввода-вывода, показанное в журналах? Это количество разных тем?
- Как я могу контролировать максимальное количество диспетчеров ввода-вывода для одного http-клиента? это диспетчер ввода / вывода?
- В чем разница между количеством диспетчеров ввода-вывода и количеством соединений?
- Как оценить количество диспетчеров ввода-вывода и количество подключений, которые мне понадобятся, исходя из машины, на которой я запускаю программу, и пропускной способности / размера данных?