Параллельные потоки Java - PullRequest
0 голосов
/ 10 ноября 2019

У меня есть Gson JsonObject, в котором на каком-то вложенном уровне у меня есть JsonArray. Это моя реализация, в которой я пытаюсь распараллелить извлечение и сопоставление JsonArray целевому объекту.

private List<MerchantAggregationResponse> extractMerchantAggregationResponse(String response) {
        JsonArray aggMembers = gson.fromJson(response, JsonObject.class).getAsJsonObject(AGGREGATIONS).getAsJsonObject(MERCHANT_AGG).getAsJsonArray(BUCKETS);
        log.info("ForkJoinPool Parallelism:{}",ForkJoinPool.commonPool().getParallelism());
        return StreamSupport.stream(aggMembers.spliterator(), true).map(mapFuelMerchantAggregationsResponse).collect(Collectors.toList());
    }

    private static Function<JsonElement, MerchantAggregationResponse> mapFuelMerchantAggregationsResponse = a -> {
        log.info("Hi There");
        MerchantAggregationResponse merchantAggregationResponse = new MerchantAggregationResponse();
        merchantAggregationResponse.setMerchantName(((JsonObject) a).get(KEY).getAsString());
        merchantAggregationResponse.setAmount(((JsonObject) a).get(TXN_VALUE_AGG).getAsJsonObject().get(VALUE).getAsDouble());
        //Getting first element as there shall be only one category
        merchantAggregationResponse.setIsNroPump(((JsonObject) a).get(NRO_AGG).getAsJsonObject().get(BUCKETS).getAsJsonArray().get(0).getAsJsonObject().get(KEY_AS_STRING).getAsBoolean());
        merchantAggregationResponse.setCategory(((JsonObject) a).get(OMC_AGG).getAsJsonObject().get(BUCKETS).getAsJsonArray().get(0).getAsJsonObject().get(KEY).getAsString());
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return merchantAggregationResponse;
    };

Примечание. Я использовал в параметре StreamSupport.stream параметр parallel = true.

Isэто правильный способ реализовать это?

Кроме того, когда я пытаюсь увидеть поток, выполняющий функцию, я вижу, что тот же поток из ForkJoinPool выполняет его. Если я поставлю немного сна (10000 мс), поток ждет, а затем тот же поток выполняется после остановки (можно проверить, используя время журнала). Почему не используются все три темы?

2019-11-10 21:55:35.127  INFO -- [nio-8080-exec-1] c.test.testparallel.service.ptest.PtestService   [065b5184-3dc6-43d2-8ec5-63d276ad2bff] : ForkJoinPool Parallelism: 3

2019-11-10 21:55:35.473  INFO -- [onPool-worker-1] c.test.testparallel.service.ptest.PtestService   [] : Hi There
2019-11-10 21:55:45.478  INFO -- [onPool-worker-1] c.test.testparallel.service.ptest.PtestService   [] : Hi There
2019-11-10 21:55:55.481  INFO -- [onPool-worker-1] c.test.testparallel.service.ptest.PtestService   [] : Hi There

Заранее спасибо.

...