В настоящее время я использую Flink версии 1.6 и столкнулся с проблемой AsyncIO, из-за которой производительность не соответствует моим ожиданиям.
Я уверен, что я делаю что-то не так в своей реализации, поэтому любые советы / предложения будут оценены.
Выпуск Резюме -
Я потребляю поток идентификаторов.
Для каждого идентификатора мне нужно вызвать службу REST.
Я реализовал RichAsyncFunction, которая выполняет асинхронный вызов REST.
Вот соответствующий метод кода и метод asyncInvoke
// these are initialized in the open method
ExecutorService executorService =
ExecutorService.newFixedThreadPool(n);
CloseableHttpAsyncClient client = ...
Gson gson = ...
public void asyncInvoke(String key, final ResultFuture<Item> resultFuture) throws Exception {
executorService.submit(new Runnable() {
client.execute(new HttpGet(new URI("http://myservice/" + key)), new FutureCallback<HttpResponse>() {
@Override
public void completed(final HttpResponse response) {
System.out.println("completed successfully");
Item item = gson.fromJson(EntityUtils.toString(response.getEntity), Item.class);
resultFuture.complete(Collections.singleton(item));
}
});
});
}
В приведенной выше реализации я попытался: -
- Увеличение параллельности операции обогащения
- Увеличение количества потоков в службе исполнителя
- Используя асинхронный клиент Apache http, я попытался настроить параметры диспетчера соединений - setDefaultMaxPerRoute и setMaxTotal.
Я постоянно получаю пропускную способность около 100 запросов / сек. Сервис способен обрабатывать более 5к в секунду.
Что я делаю не так, и как я могу это улучшить?