Как объединить несколько слушателей (шаг, чтение, обработка, запись и пропуск) в весеннем пакете - PullRequest
0 голосов
/ 10 июля 2019

Целью этой операции является отслеживание строк или элементов, которые читаются / обрабатываются / записываются в пакетном задании с использованием нескольких шагов.

Я создал прослушиватель, который реализует следующие интерфейсы: StepExecutionListener, SkipPolicy, ItemReadListener, ItemProcessListener, ItemWriteListener

@Component
public class GenericListener implements StepExecutionListener, SkipPolicy, ItemReadListener, ItemProcessListener, ItemWriteListener {
    private Log logger = LogFactory.getLog(getClass());
    private JobExecution jobExecution;
    private int numeroProcess = 0;
    private int currentReadIndex = 0;
    private int currentProcessIndex = 0;
    private int currentWriteIndex = 0;

    @Override
    public void beforeRead() throws Exception {
        log.info(String.format("[read][line : %s]", currentReadIndex));
        currentReadIndex++;
    }
    @Override
    public void afterRead (Object o) throws Exception {
        log.info("Ligne correct");
    }
    @Override
    public void onReadError (Exception e) throws Exception {
        jobExecution.stop();
    }
    @Override
    public boolean shouldSkip (Throwable throwable, int i) throws SkipLimitExceededException {
        String err = String.format("Erreur a la ligne %s | message %s | cause %s | stacktrace %s", numeroProcess, throwable.getMessage(), throwable.getCause().getMessage(), throwable.getCause().getStackTrace());
        log.error(err);
        return true;
    }
    @Override
    public void beforeProcess (Object o) {
        log.debug(String .format("[process:%s][%s][Object:%s]", numeroProcess++, o.getClass(), o.toString()));
        currentProcessIndex++;
    }
    @Override
    public void afterProcess (Object o, Object o2) { }
    @Override
    public void onProcessError (Object o, Exception e) {
        String err = String.format("[ProcessError at %s][Object %s][Exception %s][Trace %s]", currentProcessIndex, o.toString(), e.getMessage(), e.getStackTrace());
        log.error(err);
        jobExecution.stop();
    }
    @Override
    public void beforeWrite (List list) {
        log.info(String .format("[write][chunk number:%s][current chunk size %s]", currentWriteIndex, list != null ? list.size() : 0));
        currentWriteIndex++;
    }
    @Override
    public void afterWrite (List list) { }
    @Override
    public void onWriteError (Exception e, List list) {
        jobExecution.stop();
    }
    @Override
    public void beforeStep(StepExecution stepExecution) {
        jobExecution = stepExecution.getJobExecution();
        currentReadIndex = 0;
        currentProcessIndex = 0;
        currentWriteIndex = 0;
    }
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}

Определение задания (CustomJobListener - это простой класс, расширяющий JobExecutionListenerSupport)

public class BatchConfiguration {
    @Autowired
    public JobBuilderFactory jobs;

    @Bean
    public Job job(CustomJobListener listener,
                     @Qualifier("step1") Step step1,
                     @Qualifier("step2") Step step2,
                     @Qualifier("step3") Step step3) {
        return jobs.get("SimpleJobName")
                .incrementer(new RunIdIncrementer())
                .preventRestart()
                .listener(listener)
                .start(step1)
                .next(step2)
                .next(step3)
                .build();
    }
}

Определение шага (все три шага имеют одинаковое определение, толькоизменения чтения / процессора / записи)

@Component
public class StepControleFormat {
    @Autowired
    private StepOneReader reader;
    @Autowired
    private StepOneProcessor processor;
    @Autowired
    private StepOneWriter writer;
    @Autowired
    private ConfigAccess configAccess;
    @Autowired
    private GenericListener listener;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    @JobScope
    @Qualifier("step1")
    public Step stepOne() throws StepException {
        return stepBuilderFactory.get("step1")
                .<StepOneInput, StepOneOutput>chunk(configAccess.getChunkSize())
                .listener((ItemProcessListener<? super StepOneInput, ? super StepOneOutput>) listener)
                .faultTolerant()
                .skipPolicy(listener)
                .reader(reader.read())
                .processor(processor.compose())
                .writer(writer)
                .build();
    }
}

Теперь проблема в том, что методы beforeStep(StepExecution stepExecution) и afterStep(StepExecution stepExecution) не запускаются, но все другие методы в GenericListener запускаются правильно, когда происходят их соответствующие события.

Я пытался использовать listener((StepExecutionListener)listener) вместо listener((ItemProcessListener<? super StepOneInput, ? super StepOneOutput>) listener), но последний возвращает AbstractTaskletStepBuiler, а затем я не могу использовать reader, processor или writer.

Обновление : моя весенняя загрузочная версия: v1.5.9.RELEASE

1 Ответ

0 голосов
/ 15 июля 2019

Я решил это благодаря подсказке Майкла Минеллы:

@Bean
@JobScope
@Qualifier("step1")
public Step stepOne() throws StepException {
    SimpleStepBuilder<StepOneInput, StepOneOutput> builder = stepBuilderFactory.get("step1")
            .<StepOneInput, StepOneOutput>chunk(configAccess.getChunkSize())
            // setting up listener for Read/Process/Write
            .listener((ItemProcessListener<? super StepOneInput, ? super StepOneOutput>) listener)
            .faultTolerant()
            // setting up listener for skipPolicy
            .skipPolicy(listener)
            .reader(reader.read())
            .processor(processor.compose())
            .writer(writer);

    // for step execution listener
    builder.listener((StepExecutionListener)listener);

    return builder.build();
}

Последний вызванный listener метод public B listener(StepExecutionListener listener) из StepBuilderHelper<B extends StepBuilderHelper<B>> возвращает StepBuilderHelper, который не содержит определения для build() метод.Таким образом, решение состояло в том, чтобы разделить определение сборки шага.

Что я не понимаю, так это то, что метод writer возвращает SimpleStepBuilder<I, O>, который содержит определение для этого метода public SimpleStepBuilder listener(Object listener),Компилятор / IDE (IntelliJ IDEA) вызывает public B listener(StepExecutionListener listener) из StepBuilderHelper<B extends StepBuilderHelper<B>>.Если бы кто-нибудь мог помочь объяснить это поведение.

Более того, было бы очень интересно найти способ подключения всех слушателей за один вызов, используя public SimpleStepBuilder listener(Object listener) из SimpleStepBuilder.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...