Сбор результатов из списка фьючерсов в java - PullRequest
2 голосов
/ 01 августа 2020

Я пытаюсь использовать фьючерсы для одновременных вызовов API. Код:

private void init() throws ExecutionException, InterruptedException {
    Long start = System.currentTimeMillis();
    List<ApiResponse> responses = fetchAllUsingFuture(ids, 3);
    log.info(responses.toString());
    Long finish = System.currentTimeMillis();
    log.info(MessageFormat.format("Process duration: {0} in ms", finish-start));
}

private List<ApiResponse> fetchAllUsingFuture(List<String> ids, int threadCount) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    List<List<String>> chunks = Utils.splitToChunks(ids, threadCount);
    List<Future<List<ApiResponse>>> futures = new ArrayList<>();
    chunks.forEach(chunk -> {
        futures.add(wrapFetchInFuture(chunk));
    });
    Future<List<ApiResponse>> resultFuture = executorService.submit(() -> {
        List<ApiResponse> responses = new ArrayList<>();
        futures.forEach(future -> {
            try {
                responses.addAll(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        return responses;
    });

    executorService.shutdown();
    return resultFuture.get();
}



private Future<List<ApiResponse>> wrapFetchInFuture(List<String> ids) {
    return new FutureTask<>(() -> {
        List<ApiResponse> responses = new ArrayList<>();
        ids.forEach(id -> {
            responses.add(fetchData(id));
        });
        return responses;
    });
}

private ApiResponse fetchData(String id) {
    ResponseEntity<ApiResponse> response = restTemplate.getForEntity(id, ApiResponse.class);
    log.info(MessageFormat.format("Fetching from {0}", id));
    ApiResponse body = response.getBody();
    log.info(MessageFormat.format("Retrieved {0}", body));
    return body;
}

Он не выполняется, приложение запускается, а затем просто ожидает. Фьючерсы не исполняются. Все советы приветствуются. PS Я знаю, что это намного проще сделать с помощью CompletableFuture, мне просто интересно, как это сделать с Futures

Ответы [ 2 ]

1 голос
/ 01 августа 2020

Похоже, вы создаете список FutureTasks, но никогда не отправляете их ExecutorService для их выполнения. Я реализовал ExecutorService с Future Object, как показано ниже, надеюсь, это поможет вам:

Уровень обслуживания:

public List<MovieDTO> searchMoviesParallel(String limit, String offset, String searchPhrase) throws Exception {

        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<List<MovieDTO>> digitoonResult = executor.submit(new DigitoonSearchTask(limit, offset, searchPhrase));
        List<MovieDTO> movieDTOList = digitoonResult.get();
        executor.shutdown();

        return movieDTOList;
    }

И моя задача поиска (класс DigitoonSearchTask) выглядит следующим образом:

public class DigitoonSearchTask implements Callable<List<MovieDTO>> {

    private String limit;
    private String offset;
    private String searchPhrase;

    private final static String digitoonSearchBaseUrl = "http://apitwo.xxx.com/partner/search/?q=";

    public DigitoonSearchTask(String limit, String offset, String searchPhrase) {
        this.limit = limit;
        this.offset = offset;
        this.searchPhrase = searchPhrase;
    }

    @Override
    public List<MovieDTO> call() throws Exception {
        List<MovieDTO> movieDTOList = new ArrayList<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            String uri = digitoonSearchBaseUrl + URLEncoder.encode(searchPhrase, "utf-8") + "&limit=" + limit + "&offset=" + offset;
            URL url = new URL(uri);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("GET");
            conn.setRequestProperty("Accept", "application/json");
            conn.setRequestProperty("authorization", "xxxxxxxxxx");
            if (conn.getResponseCode() != 200) {
                throw new RuntimeException("Failed : HTTP error code : "
                        + conn.getResponseCode());
            }
            BufferedReader br = new BufferedReader(new InputStreamReader(
                    (conn.getInputStream())));

            String output;
            while ((output = br.readLine()) != null) {
                movieDTOList = Arrays.asList(mapper.readValue(output, MovieDTO[].class));
            }
            br.close();
            conn.disconnect();
        } catch (UnknownHostException e) {
            call();
        } catch (MalformedURLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return movieDTOList;
    }}

учтите, что теперь у меня есть только один API, и после получения других их можно добавить в качестве другой задачи поиска на уровне обслуживания, увеличив номер потока.

1 голос
/ 01 августа 2020

В исходной версии вопроса вы создаете список из FutureTasks, но никогда не отправляете их в ExecutorService для их выполнения. Задачи никогда не завершаются, поэтому Future.get блокируется навсегда.

В обновленной версии вопроса вы поместили код, который выполняет ожидание, в службу исполнителя как задачу. FutureTasks никогда не запускается, поэтому FutureTask.get все равно будет заблокирован навсегда.

Я бы посоветовал вам изменить код в fetchAllUsingFuture на:

    List<Callable<List<ApiResponse>>> tasks = new ArrayList<>();
    chunks.forEach(chunk -> {
        tasks.add(wrapFetchInCallable(chunk));
    });
    List<Future<List<ApiResponse>>> futures = executorService.invokeAll(tasks);

, где wrapFetchInCallable создает Callable вместо FutureTask:

private static Callable<List<ApiResponse>> wrapFetchInCallable(List<String> ids) {
    return () -> {
        List<ApiResponse> responses = new ArrayList<>();
        ids.forEach(id -> {
            responses.add(fetchData(id));
        });
        return responses;
    };
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...