У меня есть устаревшая система, которая отправляет сообщение в RabbitMQ.
Система использует только одну очередь: q.finance.invoice
, но у нее есть два типа сообщений, где тип сообщения доступен в заголовке.
Первый тип
Type : invoice.created
{
"field_1" : "",
"field_2" : "",
}
Второй тип
Type : invoice.paid
{
"field_5" : "",
"field_6" : "",
}
Так что теперь мой потребитель должен обрабатывать сообщение выборочно на основе типа данных.
Spring имеет @RabbitHandler
, что возможно сделать ... ЕСЛИ сообщение опубликовано весной.
Я не могу использовать аннотацию @RabbitHandler
.
Я думаю, потому что @RabbitHandler
преобразует сообщение на основе заголовка __TypeId__
, который не существует в устаревшей системе.
Как я могу смоделировать это @RabbitHandler
поведение (принимая данные на основе его типа)?
Так что я использую @RabbitListener
, чтобы использовать сообщение.
Но @RabbitListener
принимает все типы сообщений.
Другая причина, по которой мы используем @RabbitListener
, заключается в том, что наш обработчик ошибок зависит от Message
и Channel
Основная сигнатура метода, которую мы имеем, выглядит следующим образом:
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
// convert message body JSON string to object
// process it
}
Я пытаюсь сделать ручной отказ на основе типа, который работает. Но я уверен, что он не масштабируется, когда у меня много слушателей или очередей
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@Service
public class InvoiceListenerOnMethod {
private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice created : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice created : {}", message);
}
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice paid : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice paid : {}", message);
}
}
Смотрите, суть в том, что когда у меня есть 4 сообщения (платно-платно-создано-создано), слушатель может работать более 4 раз, потому что мы не можем контролировать, кто будет принимать какое сообщение. Так может быть и для listenInvoicePaid()
- отклонять ()
- отклонять ()
- извед ()
- отклонять ()
- извед ()
И точно так же, как множественные отклонения () перед ack () также могут происходить в listenInvoiceCreated()
Таким образом, в общей сложности у меня может быть около 10 сообщений или около того, прежде чем все сообщения будут правильно обработаны.
Есть предложения по исправлению кода?