задание весеннего пакетного перезапуска начинается с начальной стадии, а не с того места, где оно было остановлено? - PullRequest
0 голосов
/ 20 декабря 2018

Я хочу реализовать функцию перезапуска задания, чтобы запустить его с первоначального мудреца.Я сталкиваюсь с двумя проблемами.

Первая проблема: Когда я перезапускаю работу в самый первый раз, она создает новый идентификатор экземпляра задания и ведет себя как свежая работа.Во второй раз он перезапустится и запустится с тем же идентификатором экземпляра задания.(Я отправил идентификатор выполнения с контроллера покоя)

Вторая проблема: он начнется с начальной стадии, когда я перезапущу его.

Пользовательский считыватель:

package com.orange.alc.dabekdataload.reader;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.annotation.AfterStep;
    import org.springframework.batch.core.annotation.BeforeStep;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemStream;
    import org.springframework.batch.item.ItemStreamException;
    import org.springframework.batch.item.NonTransientResourceException;
    import org.springframework.batch.item.ParseException;
    import org.springframework.batch.item.UnexpectedInputException;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Scope;
    import org.springframework.context.annotation.ScopedProxyMode;
    import org.springframework.core.io.FileSystemResource;
    import org.springframework.stereotype.Component;

    import com.orange.alc.dabekdataload.constants.PostalHeader;
    import com.orange.alc.dabekdataload.dto.PostalDto;

    @Component("itemReader")
    @Scope(value = "step", proxyMode = ScopedProxyMode.TARGET_CLASS)
    public class PostalReader implements ItemReader<PostalDto>, ItemStream{

        private static final Logger LOGGER = LoggerFactory.getLogger(PostalReader.class);

        @Value("#{jobParameters[fullPathFileName]}")
        public String fileName;

        private int currentIndex = 0;

        private static final String CURRENT_INDEX = "current.index";

        private FlatFileItemReader<PostalDto> reader;

        @BeforeStep
        public void beforeStep(StepExecution stepExecution) {
            LOGGER.info("Executing batch reader...");
            reader = new FlatFileItemReader<>();
            reader.setResource(new FileSystemResource(fileName));
            reader.setLinesToSkip(1);
            reader.setLineMapper(new DefaultLineMapper<PostalDto>() {{
                setLineTokenizer(new DelimitedLineTokenizer() {{
                    setNames(PostalHeader.getPostalColumnNames());
                }});
                setFieldSetMapper(new PostalFieldSetMapper());
            }});
            reader.setSaveState(true);
            reader.open(stepExecution.getExecutionContext());

        }

        @Override
        public PostalDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
            reader.setCurrentItemCount(currentIndex++);
           return reader.read();
        }

        @AfterStep
        public void afterStep(StepExecution stepExecution) {
            LOGGER.info("Closing the reader...");
            reader.close();
        }

        @Override
        public void open(ExecutionContext executionContext) throws ItemStreamException {
            if(executionContext.containsKey(CURRENT_INDEX)){
                currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
            } else{
                currentIndex = 0;
            }

        }

        @Override
        public void update(ExecutionContext executionContext) throws ItemStreamException {
            executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());

        }

        @Override
        public void close() throws ItemStreamException {


        }


    }

Код перезапуска задания:

@Override
public void restartJob(Long jobId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {
    LOGGER.info("Restarting job with JobId: {}", jobId);
    jobOperator.restart(jobId);
}

Пожалуйста, дайте мне знать, если вам нужен какой-либо код с моей стороны.

1 Ответ

0 голосов
/ 20 декабря 2018

Программа чтения делегатов (FlatFileItemReader), используемая в пользовательской программе чтения (PostalReader), не выполняет контракт ItemStream.Вам необходимо вызвать open/update/close в программе чтения делегатов в соответствующих open/update/close методах вашей программы чтения элементов.Что-то вроде:

public class PostalReader implements ItemReader<PostalDto>, ItemStream{

   private FlatFileItemReader<PostalDto> reader;

   @Override
   public void open(ExecutionContext executionContext) throws ItemStreamException {
      reader.open(executionContext);
   }

   @Override
   public void update(ExecutionContext executionContext) throws ItemStreamException {
      reader.update(executionContext);
   }

   @Override
   public void close() throws ItemStreamException {
      reader.close();
   }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...