Spring Integration несколько потребителей очереди - PullRequest
0 голосов
/ 03 августа 2020

В результате Spring интеграции MessageQueue без опроса у меня есть опросчик, который мгновенно потребляет сообщения из очереди, используя настраиваемый TaskScheduler:

ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.priority().get())
  .bridge(bridge -> bridge
    .taskScheduler(taskScheduler)
    .poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
  .fixedSubscriberChannel()
  .route(inboundRouter())
  .get()

Теперь я бы хотел для одновременного использования нескольких потоков, поэтому я попробовал:

ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
scheduler.setPoolSize(4);

Однако, поскольку в AbstractPollingEndpoint планировщик задач планирует синхронный опросчик (это немного сложно), создается только 1 поток. Если я устанавливаю TaskExecutor на любое значение, кроме SyncTaskExecutor (по умолчанию), я сталкиваюсь с потоком запланированных задач (см. Spring интеграция MessageQueue без опроса ).

Как я могу одновременно потреблять из очередь в Spring Integration? Это кажется довольно примитивным c, но я не смог найти решения.

Вместо очереди я мог бы использовать ExecutorChannel, однако (AFAIK) я теряю такие функции очереди, как приоритет, размер очереди, и показатели, на которые я полагаюсь.

Ответы [ 2 ]

1 голос
/ 05 августа 2020

Мне удалось решить это так:

  • Однопоточный планировщик задач, который выполняет опрос
  • Исполнитель пула потоков с синхронной очередью

Таким образом, планировщик задач может дать каждому исполнителю 1 задачу и блокировать, когда ни один исполнитель не свободен, таким образом, не опустошая исходную очередь или не рассылая спам задачи.

  @Bean
  public IntegrationFlow extractTaskResultFlow() {
    return IntegrationFlows
      .from(ChannelNames.TASK_RESULT_QUEUE)
      .bridge(bridge -> bridge
        .taskScheduler(taskResultTaskScheduler())
        .poller(Pollers
          .fixedDelay(0)
          .taskExecutor(taskResultExecutor())
          .receiveTimeout(Long.MAX_VALUE)))
      .handle(resultProcessor)
      .channel(ChannelNames.TASK_FINALIZER_CHANNEL)
      .get();
  }

  @Bean
  public TaskExecutor taskResultExecutor() {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
      1, // corePoolSize
      8, // maximumPoolSize
      1L, // keepAliveTime
      TimeUnit.MINUTES,
      new SynchronousQueue<>(),
      new CustomizableThreadFactory("resultProcessor-")
    );
    executor.setRejectedExecutionHandler(new CallerBlocksPolicy(Long.MAX_VALUE));
    return new ErrorHandlingTaskExecutor(executor, errorHandler);
  }

  @Bean
  public TaskScheduler taskResultTaskScheduler() {
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    scheduler.setThreadNamePrefix("resultPoller-");
    return scheduler;
  }

связанный вопрос, теперь он напоминает мое реальное решение)

1 голос
/ 03 августа 2020

См. PollerSpec.taskExecutor():

/**
 * Specify an {@link Executor} to perform the {@code pollingTask}.
 * @param taskExecutor the {@link Executor} to use.
 * @return the spec.
 */
public PollerSpec taskExecutor(Executor taskExecutor) {

Таким образом, после периодического планирования задачи в соответствии с вашими taskScheduler и delay, реальная задача выполняется в потоке от этого предоставленного исполнителя. По умолчанию он действительно выполняет задачу в потоке планировщика.

UPDATE

Я не уверен, соответствует ли это вашим требованиям, но это единственный способ сохранить ваши очередь logi c и обрабатывать все, что идет параллельно:

 .bridge(bridge -> bridge
    .taskScheduler(taskScheduler)
    .poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
 .channel(channels -> channel.executor(threadPoolExecutor()))   
 .fixedSubscriberChannel()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...