Spring Integration - Multiple Flows, отдельный поток обработки - PullRequest
0 голосов
/ 10 февраля 2020

Я пытаюсь разработать решение Spring Integration для требования.

Мое требование состоит в том, чтобы «опрашивать» несколько почтовых ящиков в разное время и получать письма из «Входящих», сохранять их и перемещать почту в другая папка электронной почты.

Я использую Spring Integration DSL для регистрации каждого из потоков интеграции с соответствующим выражением imapInboundAdapter и cron (например, если у меня есть два почтовых ящика для очистки, один сканируется каждые 2 минуты, а другой - каждый 3 минуты)

Чтобы обработать (Service Activator) полученное сообщение, я вызываю отдельный метод - payloadProcessService.processMail (), который в основном сохраняет почту в объекте BLOB-объектов и перемещает сообщение в другую папку электронной почты. .

Кажется, все работает нормально, и каждый хрон запускается, как и ожидалось. Однако, когда обрабатывается первое задание cron (например, из-за большого вложения), запускается второе задание cron (которое должно охватить другой почтовый ящик), и это похоже на перегрузку / переопределение при обработке первого задания.

Как я могу создать разные потоки для каждой обработки и избежать наложения с методом обработки? Из того, что я прочитал, планировщик задач выглядит как вариант, но не уверен, будет ли каждый вызывающий метод выполняться отдельно для каждого потока интеграции. Как насчет @ Asyn c метода обработки? Это поможет? Любое направление приветствуется!

Я включил упрощенный код того, что я пытаюсь сделать.

// 1.

@Configuration
@EnableIntegration
public class DynamicFlowConfiguration {

    @Autowired
    private PayloadProcessService payloadProcessService;

    @Autowired
    Session mailSession;

    @Autowired
    private IntegrationFlowContext integrationFlowContext;

    @Bean
    public List<IntegrationFlow> registerFlows() throws Exception {

        List<MyMailBox> mailboxList = MyMailBoxService.list();
        List<IntegrationFlow> flows = new ArrayList<>();

        // (i) Develop a list of integration flows:
        mailboxList.forEach(eachMailbox -> {
                flows.add(getFlowBuilder(eachMailbox) 
                    .handle(payloadProcessService.processMail()) 
                    .get() 
                    );
                });

        // (ii) Register all flows, one per mailbox:
        flows.forEach(f -> this.integrationFlowContext.registration(f) 
                .register());

        return flows;
    }


    private IntegrationFlowBuilder getFlowBuilder(MyMailBox myMailBox)  {

        IntegrationFlowBuilder flowBuilder = null;

        // Get URL
        URLName urlName = new URLName("imaps", "mymailhost","myport", "Inbox", user, pwd);
        // Get MailBox specific CronExpression:
        String cronExpression = myMailBox.getMailboxPollingInterval();

        MailInboundChannelAdapterSpec adapterSpec;

        adapterSpec = Mail.imapInboundAdapter(urlName.toString()) 
                .searchTermStrategy(searchTermStrategy) 
                .javaMailProperties(getJavaMailProperties()) 
                .shouldMarkMessagesAsRead(true); 

            flowBuilder = IntegrationFlows.from(adapterSpec.simpleContent(true) 
                                                           .autoCloseFolder(true)
                                                           .shouldDeleteMessages(false),
                                                           e -> e.autoStartup(true)
                                                                 .poller(p -> p.cron(cronExpression).maxMessagesPerPoll(10))); //
        }

        return flowBuilder;
    }

}



//2.

@Service
public class PayloadProcessService {

    @Bean
    public MessageHandler processMail() {
        MessageHandler mh = new MessageHandler() {

            @Override
            public void handleMessage(org.springframework.messaging.Message<?> message)
                    throws org.springframework.messaging.MessagingException {

                MimeMessage mimeMessage = (MimeMessage) message.getPayload();

                    emailSave(mimeMessage);
                    move(mimeMessage);
            }
        };
        return mh;``
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...