Большое потребление памяти при обработке задач Runnable - PullRequest
0 голосов
/ 21 сентября 2018

У меня есть этот класс:

@Component
public class TestClass extends Verify {

    private final ThreadPoolTaskExecutor processExecutor;
    private final BlockingQueue<KPIData> kpiData;

    @Autowired
    public TestClass(
            final DbAddresser dbAddresser,
            final MyDao myDao,
            final ThreadPoolTaskExecutor processExecutor) {
        super(VerificationType.KPI, dbAddresser, myDao);
        this.processExecutor = processExecutor;
        kpiData = new ArrayBlockingQueue<>(20);
    }

    @Override
    public void verification() throws Exception {
        startedVerification();
        verifyIndicator.setTitle("tasks");
        final Thread consumerKPI = new Thread(() -> {
            LOG.debug("Consumer thread started....");
            try {
                KPIData kpiData;
                while (!(kpiData = this.kpiData.take()).isLastElement()) {
                    final InconsistsKPICallable callable = new InconsistsKPICallable(((KPIDataImpl) kpiData).getRes());
                    execDispatcher.put(processExecutor.submit(callable));
                    verifyIndicator.incMaxStep();
                }
            } catch (Exception e) {
                LOG.error("Consumer thread error: ", e);
            }
            LOG.debug("Consumer thread finished....");
        });
        consumerKPI.start();

        try {

            dbAddresser.getKpiAll(kpiData, 5000);

            final Pair<Long, Long> minMaxEntities = dbAddresser::getMinMaxEntities;
            if (minMaxEntities != null) {
                long currentEntityId = minMaxEntities.getLeft();

                while (currentEntityId <= minMaxEntities.getRight()) {
                    if (stopped) {
                        break;
                    }

                    long fromEntityId = currentEntityId;
                    long toEntityId = currentEntityId + 4999;

                    incMaxStep();

                    InconsistsKPIneedUpdateCallable callable = new InconsistsKPIneedUpdateCallable(fromEntityId, toEntityId);
                    execDispatcher.put(processExecutor.submit(callable));
                    currentEntityId = toEntityId + 1;
                }
            }
        } finally {
            consumerKPI.join();
            waitFinished();
            finishedVerification();
        }
    }

    public class InconsistsKPICallable implements Runnable {
        private final List<Pair<Long, String>> kpiList;

        InconsistsKPICallable(final List<Pair<Long, String>> kpiList) {
            this.kpiList = kpiList;
        }

        @Override
        public void run() {
            try {
                // do hard work with kpiList
            } catch (Exception ex) {
            } finally {
                LOG.debug("InconsistsSmbpKPICallable finished!");
            }
            incCurStep();
        }
    }

    public class InconsistsKPIneedUpdateCallable implements Runnable {
        private final Long fromEntityId;
        private final Long toEntityId;

        InconsistsKPIneedUpdateCallable(final long fromEntityId, final long toEntityId) {
            this.fromEntityId = fromEntityId;
            this.toEntityId = toEntityId;
        }

        @Override
        public void run() {
            try {
                // do hard work with fromEntityId and toEntityId
            } catch (Exception ex) {
            } finally {
                LOG.debug("InconsistsKPIneedUpdateCallable finished for fromEntityId {} toEntityId {}", fromEntityId, toEntityId);
            }
            incCurStep();
        }
    }
}

Вся проблема в том, что здесь:

while (!(kpiData = this.kpiData.take()).isLastElement()) {
      final InconsistsKPICallable callable = new InconsistsKPICallable(((KPIDataImpl) kpiData).getRes());
      execDispatcher.put(processExecutor.submit(callable));
      verifyIndicator.incMaxStep();
}

Мы очень быстро создаем задачу, тем самым засоряя всю память.Все из-за того, что задача Runnable обрабатывается так медленно, в отличие от того, как мы их создаем.Я пытался указать размер очереди для ThreadPoolTaskExecutor, но здесь я также опираюсь на тот факт, что мы быстро оцениваем его и получаем исключение: org.springframework.core.task.TaskRejectedException.Подскажите, как можно реорганизовать этот код, чтобы уменьшить потребление памяти, а не откидывать задачи с исключением?Я пытался найти способы, чтобы гарантировать, что если очередь заполнена, вызывающий поток, который добавляет, просто ждет, а не падает с исключением, но адекватных решений не найдено.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...