Ниже мой дизайн:
Процесс загрузки: получает список записей: имя, фамилия, мобильный телефон и электронная почта
=> Подтверждает данные
=>Создает запись в db
=> Создать новый класс, который реализует Runnable для вызова внешнего API для получения дополнительных данных
=> Продолжить дальше для получения дополнительной бизнес-логики
Я используюTaskExecutor для создания нового класса, который реализует Runnable.
Я загружаю более 50 записей одновременно.
Возникает следующая проблема:
Если я загружаю 10 записей, они успешно обрабатываются.
При загрузке все данные обрабатываются правильно и создаются необходимые выполняемые потоки.
Но если я загружу 30 записей: если 1 Runnable поток (скажем, в записи 5) имеет исключение во внешнем API, то только этот поток вызывается для всех оставшихся 25 раз с теми же данными.Идентификатор потока также тот же
Мой вопрос заключается в том, почему один и тот же исполняемый поток вызывается 25 раз.Разве исполнитель задачи не должен вызывать новый runnable для каждой записи?Разве JMRequestProcessor не должен завершаться после исключения?
Любая помощь / совет приветствуются.
Ниже приведен код потока и процессора загрузки.
package com.powerhire.uploadprocessor;
@Scope("prototype")
public class JMRequestProcessor implements Runnable {
protected static final Logger logger = LogManager.getLogger(JMRequestProcessor.class);
@Autowired
JmJobService jmJobService;
.
.
.
private Candidate candidate;
.
.
.
.
@Override
public void run() {
try {
Date startTime = new Date();
logger.info("Processing JM Record " + candidate.getEmail());
jmJobService.sendJmCreateUpdateCandidateRequest(candidate, roleTypeId, campaignName, ccm);
logger.info("Finished processing JM Record " + candidate.getEmail() + " ~ " + (Calendar.getInstance().getTime().getTime() - startTime.getTime()));
} catch (Exception e) {
e.printStackTrace();
logger.error("Error executing the JM Request " + e.getMessage());
}
}
}
Подходящий класс
package com.powerhire.uploadprocessor.impl;
public class UploadDataProcessorImpl implements UploadDataProcessor {
protected static final Logger logger = LogManager.getLogger(UploadDataProcessorImpl.class);
@Autowired
CandidateService candidateService;
.
.
.
.
@Transactional
public void processData(UploadResponseBean responseBean, long userId, String roleTypeId, List<Candidate> candidateList, int candidatesProcessed) throws Exception {
//get user details for the user id
User user = userService.getUserProfile(userId);
int recordsProcessed = 0;
for(Candidate candidate : candidateList) {
//validations.....
try {
//increment the number of records processed
recordsProcessed++;
//more validations here & business logic....
//candidate not mapped to the campaign, create a new candidate campaign mapping (ccm)
CandidateCampaignMapping ccm = new CandidateCampaignMapping(campaign.getId(), candidateId, candidate.getEmail(), candidate.getMobile(), userId);
ccmService.create(ccm);
//External API called by this jmRequestProcessor
candidate.setId(candidateId);
jmRequestProcessor.setCandidate(candidate);
jmRequestProcessor.setRoleTypeId(campaign.getRoleTypeId());
jmRequestProcessor.setUser(user);
jmRequestProcessor.setCampaignName(campaignName.toString());
jmRequestProcessor.setCandidateCampaignMapping(ccm);
taskExecutor.execute(jmRequestProcessor); //Calls the runnable here
successCount++;
} catch(ValidationException ve) {
logger.error("Error while processing candidate : " + candidate.getEmail() + " : " + ve.getErrorMessage());
candidate.setUploadErrorMessage(ve.getErrorMessage());
failedCandidateList.add(candidate);
failureCount++;
} catch(Exception e) {
logger.error("Error while processing candidate : " + candidate.getEmail() + " : " + e.getMessage());
candidate.setUploadErrorMessage(IErrorMessages.INTERNAL_SERVER_ERROR);
failedCandidateList.add(candidate);
failureCount++;
}
}
responseBean.setFailureCount(failureCount);
responseBean.setSuccessCount(successCount);
//More business logic here
}
}