Отдайте приоритет удаленным каталогам SFTP - PullRequest
0 голосов
/ 23 января 2019

Используя один канал SFTP, мне нужно обработать два удаленных каталога с низким приоритетом и с высоким приоритетом, но файлы с низким приоритетом выбирают после высокого приоритета.пожалуйста, дайте знать, как обрабатывать несколько каталогов во входящем адаптере SFTP с одного канала?Мы можем использовать рекомендации https://docs.spring.io/spring-integration/reference/html/sftp.html#sftp-rotating-server-advice Rotation Service в Spring 5.1.2 Release, но как насчет 4.3.12 Release .?

1 Ответ

0 голосов
/ 23 января 2019

Недоступно в 4.3.x;эта функция была добавлена ​​в 5.0.7.

Требуются изменения инфраструктуры, поэтому будет сложно скопировать пользовательский код в 4.3.x.

Вы можете использовать два адаптера и остановить / запустить ихпри необходимости.

РЕДАКТИРОВАТЬ

Вот одно решение;рекомендация по первичному потоку запускает вторичный поток, когда новые файлы не найдены.Вторичный поток запускается только один раз, затем перезапускает основной поток;и цикл продолжается ...

@SpringBootApplication
public class So54329898Application {

    public static void main(String[] args) {
        SpringApplication.run(So54329898Application.class, args);
    }

    @Bean
    public IntegrationFlow primary(SessionFactory<LsEntry> sessionFactory) {
        return IntegrationFlows.from(Sftp.inboundAdapter(sessionFactory)
                .localDirectory(new File("/tmp/foo"))
                .remoteDirectory("foo/foo"), e -> e
                        .poller(Pollers.fixedDelay(5_000, 5_000)
                                .advice(startSecondaryAdvice())))
                .channel("channel")
                .get();
    }

    @Bean
    public IntegrationFlow secondary(SessionFactory<LsEntry> sessionFactory) {
        return IntegrationFlows.from(Sftp.inboundAdapter(sessionFactory)
                .localDirectory(new File("/tmp/foo"))
                .remoteDirectory("foo/bar"), e -> e
                        .poller(Pollers.trigger(oneShotTrigger(sessionFactory)))
                        .autoStartup(false))
                .channel("channel")
                .get();
    }

    @Bean
    public IntegrationFlow main() {
        return IntegrationFlows.from("channel")
                .handle(System.out::println)
                .get();
    }

    @Bean
    public Advice startSecondaryAdvice() {
        return new StartSecondaryWhenPrimaryIdle();
    }

    @Bean
    public FireOnceTrigger oneShotTrigger(SessionFactory<LsEntry> sessionFactory) {
        return new FireOnceTrigger((Lifecycle) primary(sessionFactory));
    }

    public static class StartSecondaryWhenPrimaryIdle extends AbstractMessageSourceAdvice
            implements ApplicationContextAware {

        private ApplicationContext applicationContext;

        @Override
        public boolean beforeReceive(MessageSource<?> source) {
            return true;
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }

        @Override
        public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
            if (result == null) {
                System.out.println("No more files on primary; starting single shot on secondary");
                this.applicationContext.getBean("primary", Lifecycle.class).stop();
                this.applicationContext.getBean("secondary", Lifecycle.class).stop();
                this.applicationContext.getBean(FireOnceTrigger.class).reset();
                this.applicationContext.getBean("secondary", Lifecycle.class).start();
            }
            return result;
        }

    }

    public static class FireOnceTrigger implements Trigger {

        private final Lifecycle primary;

        private volatile boolean done;

        public FireOnceTrigger(Lifecycle primary) {
            this.primary = primary;
        }

        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            if (done) {
                System.out.println("One shot on secondary complete; restarting primary");
                this.primary.start();
                return null;
            }
            done = true;
            return new Date();
        }

        public void reset() {
            done = false;
        }

    }

}
...