В результате Spring интеграции MessageQueue без опроса у меня есть опросчик, который мгновенно потребляет сообщения из очереди, используя настраиваемый TaskScheduler:
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
IntegrationFlows
.from("inbound")
.channel(MessageChannels.priority().get())
.bridge(bridge -> bridge
.taskScheduler(taskScheduler)
.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
Теперь я бы хотел для одновременного использования нескольких потоков, поэтому я попробовал:
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
scheduler.setPoolSize(4);
Однако, поскольку в AbstractPollingEndpoint
планировщик задач планирует синхронный опросчик (это немного сложно), создается только 1 поток. Если я устанавливаю TaskExecutor на любое значение, кроме SyncTaskExecutor
(по умолчанию), я сталкиваюсь с потоком запланированных задач (см. Spring интеграция MessageQueue без опроса ).
Как я могу одновременно потреблять из очередь в Spring Integration? Это кажется довольно примитивным c, но я не смог найти решения.
Вместо очереди я мог бы использовать ExecutorChannel
, однако (AFAIK) я теряю такие функции очереди, как приоритет, размер очереди, и показатели, на которые я полагаюсь.