Как прекратить опрос после получения сообщения? Весенняя интеграция - PullRequest
0 голосов
/ 15 марта 2020

Я хочу опросить файл в каталоге и остановить опрос, как только файл найден. Я очень новичок в Spring Framework, и многое из этого все еще очень запутанно. Проведя некоторое исследование, я нашел несколько способов сделать это, но мне не повезло ни с одним из них.

Одним из способов является использование шины управления, как показано здесь . Однако, кажется, что опрос просто прекращается через 2 секунды. Я не уверен, как включить условие, чтобы остановить только при получении файла.

Другой способ - использовать «Smart Polling», как было сказано здесь . Ссылка в ответе старая, но она указывает на официальные документы Spring здесь: Smart Polling . Благодаря статье я узнал о AbstractMessageSourceAdvice и SimpleActiveIdleMessageSourceAdvice. Последнее, кажется, соответствует моей цели и будет самым простым в реализации, поэтому я решил дать ему go. Мои коды следующие:

IntegrationConfig. java

package com.example.springexample;

import java.io.File;

import org.aopalliance.aop.Advice;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.aop.SimpleActiveIdleMessageSourceAdvice;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.util.DynamicPeriodicTrigger;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableIntegration
public class IntegrationConfig {

    @Bean
    public IntegrationFlow advised() {
        return IntegrationFlows.from("fileInputChannel")
                .handle("runBatchScript", "run", c -> c.advice(stopPollingAdvice()))
                .get();
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File("."));
        source.setFilter(new SimplePatternFileListFilter("*.bat"));
        return source;
    }

    @Bean
    public RunBatchScript runBatchScript() {
        return new RunBatchScript();
    }

    @Bean
    public Advice stopPollingAdvice() {
        DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(10000);
        SimpleActiveIdleMessageSourceAdvice advice = new SimpleActiveIdleMessageSourceAdvice(trigger);
        advice.setActivePollPeriod(60000);
        return advice;
    }
}

RunBatchScript. java

package com.example.springexample;

import java.io.IOException;
import java.util.Date;
import java.util.logging.Logger;

public class RunBatchScript {

    Logger logger = Logger.getLogger(RunBatchScript.class.getName());

    public void run() throws IOException {
        logger.info("Running the batch script at " + new Date());
        Runtime.getRuntime().exec("cmd.exe /c simplebatchscript.bat");
        logger.info("Finished running the batch script at " + new Date());
    }
}

SpringExampleApplication. java

package com.example.springexample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringExampleApplication.class, args);
    }

}

Я использовал это и это в качестве основы для моих кодов. Тем не менее, похоже, что он не работает, так как устройство опроса по-прежнему опрашивает каждую 1 секунду вместо новых 10 или 60 секунд. Более того, я не уверен, как на самом деле остановить опрос. Я попытался поместить null в конструктор для SimpleActiveIdleMessageSource, но он просто возвращает NullPointerException.

Вывод при запуске приложения:

2020-03-15 13:57:46.081  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:46 SRET 2020
2020-03-15 13:57:46.084  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:46 SRET 2020
2020-03-15 13:57:47.085  INFO 37504 --- [ask-scheduler-2] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:47 SRET 2020
2020-03-15 13:57:47.087  INFO 37504 --- [ask-scheduler-2] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:47 SRET 2020
2020-03-15 13:57:48.089  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:48 SRET 2020
2020-03-15 13:57:48.092  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:48 SRET 2020
2020-03-15 13:57:49.093  INFO 37504 --- [ask-scheduler-3] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:49 SRET 2020
2020-03-15 13:57:49.096  INFO 37504 --- [ask-scheduler-3] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:49 SRET 2020

Любая помощь с некоторым кодом с благодарностью.

1 Ответ

3 голосов
/ 15 марта 2020

Вам следует подать SimpleActiveIdleMessageSourceAdvice на @InboundChannelAdapter. Кроме того, триггер SimpleActiveIdleMessageSourceAdvice должен совпадать с триггером, используемым для опроса файлов:

    @Bean
    @EndpointId("fileInboundChannelAdapter")
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller("fileReadingMessageSourcePollerMetadata"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File("."));
        source.setFilter(new SimplePatternFileListFilter("*.bat"));
        return source;
    }

    @Bean
    public PollerMetadata fileReadingMessageSourcePollerMetadata() {
        PollerMetadata meta = new PollerMetadata();

        DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(1000);

        SimpleActiveIdleMessageSourceAdvice advice = new SimpleActiveIdleMessageSourceAdvice(trigger);
        advice.setActivePollPeriod(60000);

        meta.setTrigger(trigger);
        meta.setAdviceChain(List.of(advice));
        meta.setMaxMessagesPerPoll(1);
        return meta;
    }

Обратите внимание, что SimpleActiveIdleMessageSourceAdvice просто в следующий раз изменится на опрос файлов. Вы можете установить его на очень большое число, например несколько тысяч лет спустя, что может каким-то образом достичь вашего намерения, которое никогда не опрашивает файл снова при вашей жизни. Но поток планировщика, который опрашивает файл, все еще активен.

Если вы действительно хотите отключить этот поток планировщика, вы можете отправить сигнал выключения на шину управления.

Сначала определите шину управления:

    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from("controlBus")
                  .controlBus()
                  .get();
    }

Затем реализует AbstractMessageSourceAdvice, который отправляет сигнал отключения на управляющую шину после опроса файла:

@Service
public class StopPollingAdvice extends AbstractMessageSourceAdvice{

    @Lazy
    @Qualifier("controlBus")
    @Autowired
    private MessageChannel controlBusChannel;


    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return super.beforeReceive(source);
    }

    @Override
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
        Message operation = MessageBuilder.withPayload("@fileInboundChannelAdapter.stop()").build();
        controlBusChannel.send(operation);
        return result;
    }
}

и изменяет PollerMetadata, который опрашивает файлы, на:

@Bean
public PollerMetadata fileReadingMessageSourcePollerMetadata(StopPollingAdvice stopPollingAdvice) {
    PollerMetadata meta = new PollerMetadata(); 
    meta.setTrigger(new PeriodicTrigger(1000));
    meta.setAdviceChain(List.of(stopPollingAdvice));
    meta.setMaxMessagesPerPoll(1);
    return meta;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...