Как остановить или приостановить опрос после сбоя пакетного задания? - PullRequest
0 голосов
/ 19 мая 2018

Мы используем Spring-Batch-Integration для обработки файлов .json из каталога.Необходимо остановить обработку после сбоя, выяснить проблему (изменить файл с проблемой или другие решения), а затем продолжить.Текущая конфигурация продолжает опрос после ошибки.Как это изменить?или может быть есть другой подход для такого сценария.

@Configuration
@IntegrationComponentScan
@EnableIntegration
public class IntegrationConfig {

private @Autowired Job job;

@Bean
@ServiceActivator(inputChannel = "jobChannel", 
   outputChannel = "errorChannel")
protected JobLaunchingMessageHandler launcher(JobLauncher jobLauncher) {
    return new JobLaunchingMessageHandler(jobLauncher);
}

@Bean
public MessageChannel fileInputChannel() {
    return new DirectChannel();
}

@Bean
@InboundChannelAdapter(value = "fileInputChannel",
                       poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource source = new FileReadingMessageSource();
    source.setDirectory(new File("C:/Temp/myfiles/"));
    source.setFilter(new SimplePatternFileListFilter("*.json"));
    source.setScanEachPoll(true);
    source.setUseWatchService(true);
    return source;
}

@Transformer(inputChannel = "fileInputChannel",
             outputChannel = "jobChannel")
public JobLaunchRequest transform(File aFile) {
    String fileName = aFile.getAbsolutePath();
    JobParameters jobParameters =
    new JobParametersBuilder().addString("input.file.name", fileName)
            .addDate("dateTime", new Date()).toJobParameters();
    JobLaunchRequest request = new JobLaunchRequest(job, jobParameters);
    return request;
}
}

пример был взят из этой статьи

Ни outputChannel = "nullChannel", ни outputChannel = "errorChannel" не помогают

Ответы [ 2 ]

0 голосов
/ 21 мая 2018

Я добавил

@Bean
@DependsOn("fileInputChannel")
@ServiceActivator(inputChannel = "errorChannel", 
  outputChannel = "nullChanel")
protected ErrorLogger errorLogger(JobLauncher jobLauncher) {
    return new ErrorLogger();
}

и

public class ErrorLogger {
private static final Logger logger = 
LoggerFactory.getLogger(ErrorLogger.class);

@Autowired
private SourcePollingChannelAdapter fileInputChannel;


@ServiceActivator
public void logError(Message<JobExecution> message) {
    JobExecution msgex=message.getPayload();
     if (msgex.getStatus() == BatchStatus.FAILED) {
         logger.error("Exception " + 
         msgex.getExitStatus().getExitDescription());
         fileInputChannel.stop();
     }
}
}

Но я получаю ошибку автопроводки в ErrorLogger

Unsatisfied dependency expressed through field 'fileInputChannel'; nested 
exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: 
No qualifying bean of type 
'org.springframework.integration.endpoint.SourcePollingChannelAdapter' 
available:

Похоже, проблема порядка инициализации несмотря на @DependsOn ("fileInputChannel") , потому что я могу автоматически загружать его в отдельный контроллер без ошибок.

Работает только с

 @Autowired(required = false)
 private SourcePollingChannelAdapter fileInputChannel;
0 голосов
/ 19 мая 2018

Вам необходимо остановить адаптер входящего канала.

Вы можете автоматически подключить SourcePollingChannelAdapter, который зарегистрирован в аннотации @InboundChannelAdapter.

При обнаружении сбоя вызовите stop() на адаптере.

...