Spring RabbitListener вызывает метод после отправки сообщения - PullRequest
0 голосов
/ 21 октября 2019

После того, как определенное количество сообщений обработано моим приемником, мне нужно остановить прослушиватель Rabbit и вызвать метод, после которого прослушиватель снова включается.

@RabbitListener(...)
public void sink(Message msg) {
    processMsg();

    if (condition) {
        rabbitListenerEndpointRegistry.stop();
        doTask();
        rabbitListenerEndpointRegistry.start();
    }
}

К сожалению, транзакция завершится неудачно, и сообщениевернусь в очередь, если я остановлю слушателя внутри метода приемника. Я ищу способ вызвать метод после завершения транзакции, и слушатель не удерживает никаких сообщений.

  1. Получите сообщение и обработайте сообщение
  2. Завершить транзакцию и выпустить сообщение <- <strong>Я не хочу получать новые сообщения после этой точки
  3. Если условие выполнено, остановите слушателя
  4. Выполнитедлительное задание
  5. Запустить слушатель

Я не могу перейти к ручному управлению транзакциями, потому что это потребует слишком много изменений в моем коде, и я не могуудерживайте любые сообщения, когда моя пользовательская задача выполняется, потому что это долго выполняемая задача, и я хочу, чтобы другие работники обрабатывали сообщения в течение этого времени.

setAfterReceivePostProcessors и setAdviceChain в заводской конфигурации Rabbit не будут работатьв моем случае, потому что в обоих случаях метод будет вызываться, когда слушатель удерживает сообщение.

Ответы [ 2 ]

1 голос
/ 22 октября 2019

Кажется, что контейнер для прослушивания сообщений не является правильным подходом для вашего приложения;вместо этого вам нужна модель "pull".

Вы можете использовать один из методов rabbitTemplate.receive() или receiveAndConvert() для извлечения сообщений по требованию.

Если вы используете их вне транзакциисообщение будет немедленно получено;запустите его в транзакции, если вы хотите, чтобы сообщение было подтверждено после завершения процесса.

1 голос
/ 22 октября 2019

Вы можете начать новый поток, если выполняется условие, которое остановит слушателя и выполнит какую-то задачу и запустит слушателя. Хотя не может быть гарантировано, что новый поток остановит слушателя до того, как новое сообщение будет использовано исходным слушателем. Но оно может работать приблизительно.

Вместо того, чтобы запускать поток вручную, вы можете создать новую очередь, в которой выможет опубликовать сообщение (например, начало / остановка), и новый слушатель может остановить первоначального слушателя, когда помещает сообщение с инструкцией в новую очередь, такую ​​как запуск или остановка.

Второй подход:

Вы можетепопробуйте другой подход, я думаю, что он должен работать, вместо остановки слушателя setMaxConcurrentConsumers равным 0, что должно отключить слушателя, не позволяя ему больше использовать сообщение без остановки слушателя. И после выполнения вашей задачи измените его на исходное значение, я думаю, в вашем случае его значение равно 1.

...