У меня есть этот класс:
@Component
public class TestClass extends Verify {
private final ThreadPoolTaskExecutor processExecutor;
private final BlockingQueue<KPIData> kpiData;
@Autowired
public TestClass(
final DbAddresser dbAddresser,
final MyDao myDao,
final ThreadPoolTaskExecutor processExecutor) {
super(VerificationType.KPI, dbAddresser, myDao);
this.processExecutor = processExecutor;
kpiData = new ArrayBlockingQueue<>(20);
}
@Override
public void verification() throws Exception {
startedVerification();
verifyIndicator.setTitle("tasks");
final Thread consumerKPI = new Thread(() -> {
LOG.debug("Consumer thread started....");
try {
KPIData kpiData;
while (!(kpiData = this.kpiData.take()).isLastElement()) {
final InconsistsKPICallable callable = new InconsistsKPICallable(((KPIDataImpl) kpiData).getRes());
execDispatcher.put(processExecutor.submit(callable));
verifyIndicator.incMaxStep();
}
} catch (Exception e) {
LOG.error("Consumer thread error: ", e);
}
LOG.debug("Consumer thread finished....");
});
consumerKPI.start();
try {
dbAddresser.getKpiAll(kpiData, 5000);
final Pair<Long, Long> minMaxEntities = dbAddresser::getMinMaxEntities;
if (minMaxEntities != null) {
long currentEntityId = minMaxEntities.getLeft();
while (currentEntityId <= minMaxEntities.getRight()) {
if (stopped) {
break;
}
long fromEntityId = currentEntityId;
long toEntityId = currentEntityId + 4999;
incMaxStep();
InconsistsKPIneedUpdateCallable callable = new InconsistsKPIneedUpdateCallable(fromEntityId, toEntityId);
execDispatcher.put(processExecutor.submit(callable));
currentEntityId = toEntityId + 1;
}
}
} finally {
consumerKPI.join();
waitFinished();
finishedVerification();
}
}
public class InconsistsKPICallable implements Runnable {
private final List<Pair<Long, String>> kpiList;
InconsistsKPICallable(final List<Pair<Long, String>> kpiList) {
this.kpiList = kpiList;
}
@Override
public void run() {
try {
// do hard work with kpiList
} catch (Exception ex) {
} finally {
LOG.debug("InconsistsSmbpKPICallable finished!");
}
incCurStep();
}
}
public class InconsistsKPIneedUpdateCallable implements Runnable {
private final Long fromEntityId;
private final Long toEntityId;
InconsistsKPIneedUpdateCallable(final long fromEntityId, final long toEntityId) {
this.fromEntityId = fromEntityId;
this.toEntityId = toEntityId;
}
@Override
public void run() {
try {
// do hard work with fromEntityId and toEntityId
} catch (Exception ex) {
} finally {
LOG.debug("InconsistsKPIneedUpdateCallable finished for fromEntityId {} toEntityId {}", fromEntityId, toEntityId);
}
incCurStep();
}
}
}
Вся проблема в том, что здесь:
while (!(kpiData = this.kpiData.take()).isLastElement()) {
final InconsistsKPICallable callable = new InconsistsKPICallable(((KPIDataImpl) kpiData).getRes());
execDispatcher.put(processExecutor.submit(callable));
verifyIndicator.incMaxStep();
}
Мы очень быстро создаем задачу, тем самым засоряя всю память.Все из-за того, что задача Runnable
обрабатывается так медленно, в отличие от того, как мы их создаем.Я пытался указать размер очереди для ThreadPoolTaskExecutor
, но здесь я также опираюсь на тот факт, что мы быстро оцениваем его и получаем исключение: org.springframework.core.task.TaskRejectedException
.Подскажите, как можно реорганизовать этот код, чтобы уменьшить потребление памяти, а не откидывать задачи с исключением?Я пытался найти способы, чтобы гарантировать, что если очередь заполнена, вызывающий поток, который добавляет, просто ждет, а не падает с исключением, но адекватных решений не найдено.