У меня есть приложение для интеграции пружин, которое я хочу завершить корректно.
Приложение запускается в док-контейнере, и поток, который я хочу завершить, передает приличное количество файлов XML из одной внешней системы в другую.
Требования следующие: если приложение должно завершить работу, текущая передача файлов должна быть завершена, и после этого не нужно больше трогать файлы.
Что я узнал и сделал до сих пор:
- Docker Stop отправляет SIGTERM в основной процесс контейнеров, за которым следует SIGKILL через 10 секунд (настраивается с помощью опции --time = x)
- Я реализовал ApplicationListener и зарегистрировал его как @Bean, поэтому он будет зарегистрирован в контексте приложений.
- Поток использует модуль опроса с TransactionsManager, поэтому ApplicationListener может определить, есть ли у опроса открытая транзакция, и если это так, прослушиватель ожидает некоторое время.
Моя проблема сейчас:
С этим решением я могу дождаться окончания текущей передачи файла, но не могу сказать потоку прекратить чтение входящих файлов. Если передача завершена и другой файл прибыл в то время, когда ApplicationListener ожидал, Flow захватит файл и начнет другую передачу, которая, вероятно, прекратится, когда прибудет SIGKILL.
Инжекция потока как Lifecycle и вызов stop (), похоже, не работают, как я думал.
У меня вопрос, есть ли способ сказать Потоку, что он должен закончить свою работу, но не должен слушать какие-либо поступающие сообщения?
Вот мой код:
OutboundFlow:
@Bean
public PseudoTransactionManager transactionManager() {
return new PseudoTransactionManager();
}
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
processor.setBeanFactory(beanFactory);
return new DefaultTransactionSynchronizationFactory(processor);
}
@Bean
public PollerSpec orderOutboundFlowTempFileInPoller() {
return Pollers
.fixedDelay(pollerDelay)
.maxMessagesPerPoll(100)
.transactional(transactionManager())
.transactionSynchronizationFactory(transactionSynchronizationFactory());
}
@Bean
public IntegrationFlow orderOutboundFlowTempFileIn() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
.filterFunction(
f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
e -> e.poller(orderOutboundFlowTempFileInPoller())) ...
Применение:
public static void main(final String[] args) throws Exception {
SpringApplication.run(App.class, args);
}
@Bean
public GracefulShutdown gracefulShutdown() {
return new GracefulShutdown();
}
private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent> {
private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);
@Autowired
private Lifecycle orderOutboundFlowTempFileIn;
@Override public void onApplicationEvent(ContextClosedEvent event) {
LOG.info("Trying to gracefully shutdown App");
ApplicationContext context = event.getApplicationContext();
PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");
orderOutboundFlowTempFileIn.stop();
TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
.getAdviceChain()
.iterator().next());
if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager) {
final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
while (!transaction.isCompleted()) {
try {
LOG.info("Still active, waiting 30 more seconds");
Thread.sleep(30000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
LOG.info("Transaction completed");
}
}
}