Flink 1.6 Async IO - Как увеличить пропускную способность при обогащении потока, используя вызов службы REST? - PullRequest
0 голосов
/ 31 августа 2018

В настоящее время я использую Flink версии 1.6 и столкнулся с проблемой AsyncIO, из-за которой производительность не соответствует моим ожиданиям.

Я уверен, что я делаю что-то не так в своей реализации, поэтому любые советы / предложения будут оценены.

Выпуск Резюме - Я потребляю поток идентификаторов. Для каждого идентификатора мне нужно вызвать службу REST. Я реализовал RichAsyncFunction, которая выполняет асинхронный вызов REST.

Вот соответствующий метод кода и метод asyncInvoke

// these are initialized in the open method
ExecutorService executorService = 
ExecutorService.newFixedThreadPool(n);
CloseableHttpAsyncClient client = ...
Gson gson = ...

public void asyncInvoke(String key, final ResultFuture<Item> resultFuture) throws Exception {

    executorService.submit(new Runnable() {

        client.execute(new HttpGet(new URI("http://myservice/" + key)), new FutureCallback<HttpResponse>() {
             @Override
                public void completed(final HttpResponse response) {
                    System.out.println("completed successfully");
                    Item item = gson.fromJson(EntityUtils.toString(response.getEntity), Item.class);
                    resultFuture.complete(Collections.singleton(item));
                }
        });
    });

}

В приведенной выше реализации я попытался: -

  • Увеличение параллельности операции обогащения
  • Увеличение количества потоков в службе исполнителя
  • Используя асинхронный клиент Apache http, я попытался настроить параметры диспетчера соединений - setDefaultMaxPerRoute и setMaxTotal.

Я постоянно получаю пропускную способность около 100 запросов / сек. Сервис способен обрабатывать более 5к в секунду. Что я делаю не так, и как я могу это улучшить?

...