Как выполнить опрос из сообщения очереди 1 за один раз после завершения нисходящего потока в Spring Integration - PullRequest
2 голосов
/ 22 апреля 2020

В настоящее время я работаю над повышением производительности в потоке интеграции, пытаясь распараллелить обработку сообщений. Я реализовал все с использованием Java DSL.

Текущий поток интеграции принимает сообщения из канала очереди с фиксированным поллером и последовательно обрабатывает сообщение через несколько обработчиков, пока не достигнет окончательного обработчика, который делает некоторые окончательные вычисления, учитывающие каждый вывод предыдущего обработчика. Все они подключены в одном потоке интеграции. И в основном эти обработчики переносят вызовы на внешние системы. Здесь важно сохранить то, что сообщение не должно быть взято из очереди, пока весь нисходящий поток не будет завершен для предыдущего. Для распараллеливания мне нужны обработчики.

Текущий поток интеграции: MessageQueue -> Poller -> Обработчик 1 -> Обработчик 2 -> Обработчик X -> Окончательный обработчик

Я попытался включить параллелизм делает следующее, и это работает довольно хорошо.

MessageQueue -> Poller -> Splitter -> Executor -> Маршрутизатор с сопоставлениями подпотока с различными обработчиками -> Aggregator -> Final Handler

Проблема При таком подходе я обнаружил, что новое сообщение берется из канала очереди, прежде чем предыдущее проходит через весь нисходящий поток. Совершенно ясно, почему, добавив Splitter и Executor, можно изменить способ посещения сообщений, но дело в том, что между результатами может быть зависимость между различными сообщениями.

Вопрос в том, как я могу получать сообщения из канал очереди по одному, как бы «приостанавливать» опросчик до тех пор, пока обрабатываемое сообщение не перейдет к последней конечной точке после агрегатора? Я не знаю, как переставить компоненты или что еще я могу сделать, чтобы добиться этого.

Извините, я пытался найти ответ, но не смог его найти ... пожалуйста, обратитесь за советом. Большое спасибо


@ Blink, это то, что сработало для меня, возможно, нужен какой-то рефакторинг, и я уверен, что это можно было бы написать более элегантно. Я не эксперт, извините.

Ну вот основные элементы c:

  1. Интерфейс для переноса системы сообщений
  2. Сообщение Канал, по которому Сообщение будет маршрутизироваться при вызове метода шлюза

    @Bean
    public DirectChannel integrationChannel() {
        return MessageChannels.direct().get();
    }
    
    @MessagingGateway
    interface WrappingGateway {
    
        @Gateway(requestChannel = "integrationChannel")
        TrackingLog executeIntegration(TrackingLog trackingLog);
    
    }
    

TrackingLog - это модель, которую я использую для регистрации результатов в нисходящем потоке.

И в основном Я называю шлюз обертывания в потоке интеграции, который извлекает сообщения из очереди сообщений.

@Autowired
WrappingGateway integrationGateway;

@Bean
public IntegrationFlow createCatalogueChannelFlow() {
    return IntegrationFlows.from(cataloguePriorityChannel())

            // Queue Poller
            .bridge(s -> s.poller(Pollers.fixedRate(1, TimeUnit.SECONDS).maxMessagesPerPoll(1)).autoStartup(true)
                    .id("cataloguePriorityChannelBridge"))

            // Call to Gateway 
            .handle(m -> {
                this.integrationGateway
                        .executeIntegration(((TrackingLog) m.getPayload()));
            })

            .get();
}

@Bean
public IntegrationFlow startCatalogueIntegrationChannelFlow() {
    return IntegrationFlows.from(integrationChannel())

            // Log
            .handle(trackerSupportClient, "logMessagePreExecution")

            // Set TrackingLog in message Header
            .enrichHeaders(e -> e.headerFunction("TRACKING_LOG", m -> {
                return ((TrackingLog) m.getPayload());
            }))
 ....

Вся интеграция немного сложнее, она начинается с Asyn c HTTP-шлюза, преобразователей и маршрутизаторов. , магазины в mongodb, et c. Дело в том, что, как подсказал мне @Artem Bilan, вызов шлюза блокирует поток и не позволяет получателю очередей получать больше сообщений, пока текущее не будет полностью обработано.

Надеюсь, это поможет вам.

1 Ответ

1 голос
/ 22 апреля 2020

Это действительно интересное задание ... Я поделюсь с вами своими мыслями, и вы выберете то, что подходит именно вам.

  1. Мы всегда можем заключить часть потока в @MessagingGateway, который должен ждать ответа. И уже не имеет значения, насколько asyn c является его подпотоком. Таким образом, вы можете выполнять эти задачи параллельно, но шлюз все еще будет ждать ответа в основном потоке, блокируя следующий опрос из очереди. Вы должны убедиться, что вы возвращаете что-то в конце подпотока в replyChannel, чтобы разблокировать основной поток. См. Документы здесь: https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/messaging-endpoints.html#gateway

  2. У нас есть BarrierMessageHandler готовый компонент. Смысл в том, чтобы блокировать текущий поток сообщением, пока не появится какой-либо триггер для корреляции, которой принадлежит сообщение. Единственная проблема с этим компонентом заключается в том, что вам нужно выяснить, как снять барьер для первого сообщения, поскольку именно это сообщение будет являться триггером для следующего. Хотя мы, вероятно, можем использовать одноразовый маршрутизатор, чтобы обойти этот барьер для первого сообщения ... Документы здесь: https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/message-routing.html#barrier

  3. У нас есть такой компонент, как MessageSourcePollingTemplate. Таким образом, вы можете вызывать QueueChannel, заключенную в MessageSource лямбду, всякий раз, когда вам нужно это сделать. Я не могу сейчас придумать, как это может вписаться в поток, но это еще одна идея, как приостановить опрос. См. Документы: https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/core.html#deferred -acks-message-source

  4. Другой способ - добавить MethodInterceptor в конфигурацию Poller, чтобы пропустить вызов * 1034. * если некоторое AtomicBoolean для состояния true. Таким образом, вы сохраняете состояние до тех пор, пока сообщение не будет обработано и каждая задача опроса будет пропущена, пока вы не сбросите это состояние. Документы: https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/messaging-endpoints.html#endpoint -просмотр потребителя

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...