Получение только определенного сообщения от @RabbitListener - PullRequest
1 голос
/ 20 июня 2019

У меня есть устаревшая система, которая отправляет сообщение в RabbitMQ. Система использует только одну очередь: q.finance.invoice, но у нее есть два типа сообщений, где тип сообщения доступен в заголовке.

Первый тип

Type : invoice.created
{
  "field_1" : "",
  "field_2" : "",
}

Второй тип

Type : invoice.paid

{
  "field_5" : "",
  "field_6" : "",
}

Так что теперь мой потребитель должен обрабатывать сообщение выборочно на основе типа данных. Spring имеет @RabbitHandler, что возможно сделать ... ЕСЛИ сообщение опубликовано весной. Я не могу использовать аннотацию @RabbitHandler. Я думаю, потому что @RabbitHandler преобразует сообщение на основе заголовка __TypeId__, который не существует в устаревшей системе.

Как я могу смоделировать это @RabbitHandler поведение (принимая данные на основе его типа)?

Так что я использую @RabbitListener, чтобы использовать сообщение. Но @RabbitListener принимает все типы сообщений. Другая причина, по которой мы используем @RabbitListener, заключается в том, что наш обработчик ошибок зависит от Message и Channel Основная сигнатура метода, которую мы имеем, выглядит следующим образом:

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
   // convert message body JSON string to object
   // process it
}

Я пытаюсь сделать ручной отказ на основе типа, который работает. Но я уверен, что он не масштабируется, когда у меня много слушателей или очередей

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

@Service
public class InvoiceListenerOnMethod {

    private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice created : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice created : {}", message);
    }

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice paid : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice paid : {}", message);
    }

}

Смотрите, суть в том, что когда у меня есть 4 сообщения (платно-платно-создано-создано), слушатель может работать более 4 раз, потому что мы не можем контролировать, кто будет принимать какое сообщение. Так может быть и для listenInvoicePaid()

  • отклонять ()
  • отклонять ()
  • извед ()
  • отклонять ()
  • извед ()

И точно так же, как множественные отклонения () перед ack () также могут происходить в listenInvoiceCreated()
Таким образом, в общей сложности у меня может быть около 10 сообщений или около того, прежде чем все сообщения будут правильно обработаны.

Есть предложения по исправлению кода?

Ответы [ 3 ]

0 голосов
/ 20 июня 2019

Вы можете добавить MessagePostProcessor к свойству afterReceiveMessagePostProcessor фабрики контейнеров. В постпроцессоре вы можете проверить JSON body() и установить для заголовка __TypeId__ соответствующее имя класса.

См. этот ответ для примера.

0 голосов
/ 21 июня 2019

Возможная реализация

Вот наивный способ if-else, спасибо Марк.Это ваше предложение (1-й вариант).Что касается 2-й альтернативы, я не могу этого сделать, потому что издатель - это устаревшая система, в которой у меня нет кода

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoiceCreated(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            @Header("type") String type) throws IOException {
        if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
            log.info("Delegate to invoice paid handler");
        } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
            log.info("Delegate to invoice created handler");
        } else {
            log.info("Delegate to default handler");
        }
    }

2-й вариант реализации
Вот что я реализую, спасибоГэри.Я думаю, что это более чистый подход.Далее мне нужно только извлечь постпроцессор сообщений в какой-то другой класс для удобства обслуживания, поэтому я не буду загромождать мой @RabbitListener

файл конфигурации

import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;

@Configuration
public class RabbitmqConfig {

    @Bean(name = "rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);

        factory.setAfterReceivePostProcessors(new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                var type = message.getMessageProperties().getHeaders().get("type").toString();
                String typeId = null;

                if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
                    typeId = InvoicePaidMessage.class.getName();
                } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
                    typeId = InvoiceCreatedMessage.class.getName();
                }

                Optional.ofNullable(typeId).ifPresent(t -> message.getMessageProperties().setHeader("__TypeId__", t));

                return message;
            }

        });

        return factory;
    }

    @Bean
    Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

}

Слушатель

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;

@Service
@RabbitListener(queues = "q.finance.invoice")
public class InvoiceListener {

    private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);

    @RabbitHandler
    public void listenInvoiceCreated(InvoiceCreatedMessage message) {
        log.info("Listening invoice created : {}", message);
    }

    @RabbitHandler
    public void listenInvoicePaid(InvoicePaidMessage message) {
        log.info("Listening invoice paid : {}", message);
    }

    @RabbitHandler(isDefault = true)
    public void listenDefault(Message message) {
        log.info("Default invoice listener : {}", message.getMessageProperties().getHeaders());
    }

}
0 голосов
/ 20 июня 2019

Я не работал с весенней интеграцией кролика, но в целом идея иметь единую очередь, которая обрабатывает различные типы сообщений, звучит как нечто проблематичное:

Многие потребители потенциально могут получать сообщениятипы, которые они не могут обработать и должны будут отклонить их, чтобы сообщение возвращалось кролику, а затем снова и снова ... производительность всего кластера может ухудшиться из-за этого.

Поэтому я думаю, что есть два пути, по которым вы можете следовать:

  • Реализация единого прослушивателя, который может обрабатывать два типа сообщений.Нет необходимости менять Rabbit, но это может быть сложным рефакторингом на стороне java.

  • К счастью, Rabbit MQ очень гибок в отношении маршрутизации сообщений.Сконфигурируйте обмен для маршрутизации сообщения типа A в очередь A и сообщения типа B в очередь B на основе ключа маршрутизации, заголовка чего угодно, в Rabbit есть различные типы обменов, и вы найдете лучшую конфигурацию, которая наверняка вам подойдет,

Я бы лично пошел вторым путем.

...