Пакетная многопоточная загрузка повторяется после первого сбоя - PullRequest
0 голосов
/ 07 декабря 2018

Ниже мой дизайн:

Процесс загрузки: получает список записей: имя, фамилия, мобильный телефон и электронная почта

=> Подтверждает данные

=>Создает запись в db

=> Создать новый класс, который реализует Runnable для вызова внешнего API для получения дополнительных данных

=> Продолжить дальше для получения дополнительной бизнес-логики

Я используюTaskExecutor для создания нового класса, который реализует Runnable.

Я загружаю более 50 записей одновременно.

Возникает следующая проблема:

Если я загружаю 10 записей, они успешно обрабатываются.

При загрузке все данные обрабатываются правильно и создаются необходимые выполняемые потоки.

Но если я загружу 30 записей: если 1 Runnable поток (скажем, в записи 5) имеет исключение во внешнем API, то только этот поток вызывается для всех оставшихся 25 раз с теми же данными.Идентификатор потока также тот же

Мой вопрос заключается в том, почему один и тот же исполняемый поток вызывается 25 раз.Разве исполнитель задачи не должен вызывать новый runnable для каждой записи?Разве JMRequestProcessor не должен завершаться после исключения?

Любая помощь / совет приветствуются.

Ниже приведен код потока и процессора загрузки.

package com.powerhire.uploadprocessor;

@Scope("prototype")
public class JMRequestProcessor implements Runnable {

    protected static final Logger logger = LogManager.getLogger(JMRequestProcessor.class);

    @Autowired
    JmJobService jmJobService;
.
.
.   
    private Candidate candidate;
.
.
.
.
    @Override
    public void run() {
        try {
            Date startTime = new Date();
            logger.info("Processing JM Record " + candidate.getEmail());

            jmJobService.sendJmCreateUpdateCandidateRequest(candidate, roleTypeId, campaignName, ccm);

            logger.info("Finished processing JM Record " + candidate.getEmail() + " ~ " + (Calendar.getInstance().getTime().getTime() - startTime.getTime()));
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("Error executing the JM Request " + e.getMessage());
        }
    }
}

Подходящий класс

package com.powerhire.uploadprocessor.impl;

public class UploadDataProcessorImpl implements UploadDataProcessor {

    protected static final Logger logger = LogManager.getLogger(UploadDataProcessorImpl.class);

    @Autowired
    CandidateService candidateService;
.
.
.
.

    @Transactional
    public void processData(UploadResponseBean responseBean, long userId, String roleTypeId, List<Candidate> candidateList, int candidatesProcessed) throws Exception {

        //get user details for the user id
        User user = userService.getUserProfile(userId);

        int recordsProcessed = 0;
        for(Candidate candidate : candidateList) {
            //validations.....

            try {
                //increment the number of records processed
                recordsProcessed++;
                //more validations here & business logic....

                //candidate not mapped to the campaign, create a new candidate campaign mapping (ccm)
                CandidateCampaignMapping ccm = new CandidateCampaignMapping(campaign.getId(), candidateId, candidate.getEmail(), candidate.getMobile(), userId);
                ccmService.create(ccm);

                //External API called by this jmRequestProcessor
                candidate.setId(candidateId);
                jmRequestProcessor.setCandidate(candidate);
                jmRequestProcessor.setRoleTypeId(campaign.getRoleTypeId());
                jmRequestProcessor.setUser(user);
                jmRequestProcessor.setCampaignName(campaignName.toString());
                jmRequestProcessor.setCandidateCampaignMapping(ccm);
                taskExecutor.execute(jmRequestProcessor); //Calls the runnable here

                successCount++;
            } catch(ValidationException ve) {
                logger.error("Error while processing candidate : " + candidate.getEmail() + " : " + ve.getErrorMessage());
                candidate.setUploadErrorMessage(ve.getErrorMessage());
                failedCandidateList.add(candidate);
                failureCount++;
            } catch(Exception e) {
                logger.error("Error while processing candidate : " + candidate.getEmail() + " : " + e.getMessage());
                candidate.setUploadErrorMessage(IErrorMessages.INTERNAL_SERVER_ERROR);
                failedCandidateList.add(candidate);
                failureCount++;
            }
        }
        responseBean.setFailureCount(failureCount);
        responseBean.setSuccessCount(successCount);

       //More business logic here
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...