Я пытаюсь разработать решение Spring Integration для требования.
Мое требование состоит в том, чтобы «опрашивать» несколько почтовых ящиков в разное время и получать письма из «Входящих», сохранять их и перемещать почту в другая папка электронной почты.
Я использую Spring Integration DSL для регистрации каждого из потоков интеграции с соответствующим выражением imapInboundAdapter и cron (например, если у меня есть два почтовых ящика для очистки, один сканируется каждые 2 минуты, а другой - каждый 3 минуты)
Чтобы обработать (Service Activator) полученное сообщение, я вызываю отдельный метод - payloadProcessService.processMail (), который в основном сохраняет почту в объекте BLOB-объектов и перемещает сообщение в другую папку электронной почты. .
Кажется, все работает нормально, и каждый хрон запускается, как и ожидалось. Однако, когда обрабатывается первое задание cron (например, из-за большого вложения), запускается второе задание cron (которое должно охватить другой почтовый ящик), и это похоже на перегрузку / переопределение при обработке первого задания.
Как я могу создать разные потоки для каждой обработки и избежать наложения с методом обработки? Из того, что я прочитал, планировщик задач выглядит как вариант, но не уверен, будет ли каждый вызывающий метод выполняться отдельно для каждого потока интеграции. Как насчет @ Asyn c метода обработки? Это поможет? Любое направление приветствуется!
Я включил упрощенный код того, что я пытаюсь сделать.
// 1.
@Configuration
@EnableIntegration
public class DynamicFlowConfiguration {
@Autowired
private PayloadProcessService payloadProcessService;
@Autowired
Session mailSession;
@Autowired
private IntegrationFlowContext integrationFlowContext;
@Bean
public List<IntegrationFlow> registerFlows() throws Exception {
List<MyMailBox> mailboxList = MyMailBoxService.list();
List<IntegrationFlow> flows = new ArrayList<>();
// (i) Develop a list of integration flows:
mailboxList.forEach(eachMailbox -> {
flows.add(getFlowBuilder(eachMailbox)
.handle(payloadProcessService.processMail())
.get()
);
});
// (ii) Register all flows, one per mailbox:
flows.forEach(f -> this.integrationFlowContext.registration(f)
.register());
return flows;
}
private IntegrationFlowBuilder getFlowBuilder(MyMailBox myMailBox) {
IntegrationFlowBuilder flowBuilder = null;
// Get URL
URLName urlName = new URLName("imaps", "mymailhost","myport", "Inbox", user, pwd);
// Get MailBox specific CronExpression:
String cronExpression = myMailBox.getMailboxPollingInterval();
MailInboundChannelAdapterSpec adapterSpec;
adapterSpec = Mail.imapInboundAdapter(urlName.toString())
.searchTermStrategy(searchTermStrategy)
.javaMailProperties(getJavaMailProperties())
.shouldMarkMessagesAsRead(true);
flowBuilder = IntegrationFlows.from(adapterSpec.simpleContent(true)
.autoCloseFolder(true)
.shouldDeleteMessages(false),
e -> e.autoStartup(true)
.poller(p -> p.cron(cronExpression).maxMessagesPerPoll(10))); //
}
return flowBuilder;
}
}
//2.
@Service
public class PayloadProcessService {
@Bean
public MessageHandler processMail() {
MessageHandler mh = new MessageHandler() {
@Override
public void handleMessage(org.springframework.messaging.Message<?> message)
throws org.springframework.messaging.MessagingException {
MimeMessage mimeMessage = (MimeMessage) message.getPayload();
emailSave(mimeMessage);
move(mimeMessage);
}
};
return mh;``
}
}