Изящно отключить поток интеграции пружины - PullRequest
0 голосов
/ 15 ноября 2018

У меня есть приложение для интеграции пружин, которое я хочу завершить корректно. Приложение запускается в док-контейнере, и поток, который я хочу завершить, передает приличное количество файлов XML из одной внешней системы в другую.

Требования следующие: если приложение должно завершить работу, текущая передача файлов должна быть завершена, и после этого не нужно больше трогать файлы.

Что я узнал и сделал до сих пор: - Docker Stop отправляет SIGTERM в основной процесс контейнеров, за которым следует SIGKILL через 10 секунд (настраивается с помощью опции --time = x) - Я реализовал ApplicationListener и зарегистрировал его как @Bean, поэтому он будет зарегистрирован в контексте приложений. - Поток использует модуль опроса с TransactionsManager, поэтому ApplicationListener может определить, есть ли у опроса открытая транзакция, и если это так, прослушиватель ожидает некоторое время.

Моя проблема сейчас: С этим решением я могу дождаться окончания текущей передачи файла, но не могу сказать потоку прекратить чтение входящих файлов. Если передача завершена и другой файл прибыл в то время, когда ApplicationListener ожидал, Flow захватит файл и начнет другую передачу, которая, вероятно, прекратится, когда прибудет SIGKILL. Инжекция потока как Lifecycle и вызов stop (), похоже, не работают, как я думал.

У меня вопрос, есть ли способ сказать Потоку, что он должен закончить свою работу, но не должен слушать какие-либо поступающие сообщения?

Вот мой код: OutboundFlow:

  @Bean
  public PseudoTransactionManager transactionManager() {
    return new PseudoTransactionManager();
  }

  @Bean
  public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
    processor.setBeanFactory(beanFactory);
    return new DefaultTransactionSynchronizationFactory(processor);
  }


  @Bean
  public PollerSpec orderOutboundFlowTempFileInPoller() {
    return Pollers
        .fixedDelay(pollerDelay)
        .maxMessagesPerPoll(100)
        .transactional(transactionManager())
        .transactionSynchronizationFactory(transactionSynchronizationFactory());
  }

  @Bean
  public IntegrationFlow orderOutboundFlowTempFileIn() {
    return IntegrationFlows
        .from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
                .filterFunction(
                    f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
            e -> e.poller(orderOutboundFlowTempFileInPoller())) ...

Применение:

  public static void main(final String[] args) throws Exception {
    SpringApplication.run(App.class, args);
  }

  @Bean
  public GracefulShutdown gracefulShutdown() {
    return new GracefulShutdown();
  }

  private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);

    @Autowired
    private Lifecycle orderOutboundFlowTempFileIn;

    @Override public void onApplicationEvent(ContextClosedEvent event) {
      LOG.info("Trying to gracefully shutdown App");
      ApplicationContext context = event.getApplicationContext();
      PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");

      orderOutboundFlowTempFileIn.stop();
      TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
          .getAdviceChain()
          .iterator().next());
      if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager) {
        final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
        LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
        while (!transaction.isCompleted()) {
          try {
            LOG.info("Still active, waiting 30 more seconds");
            Thread.sleep(30000);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        }
        LOG.info("Transaction completed");
      }
    }
  }

1 Ответ

0 голосов
/ 15 ноября 2018

Вы на самом деле просто должны остановить SourcePollingChannelAdapter для Files.inboundAdapter(). Для этого вам нужно добавить .id() к e лямбде. И используйте этот идентификатор, чтобы получить SourcePollingChannelAdapter, когда вам нужно остановить его.

Таким образом, вы прекращаете получать новые файлы немедленно, и они на лету будут завершены должным образом.

Отсюда нет причин останавливать весь поток, поскольку все нижестоящие компоненты пассивны.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...