Я потребляю сообщения от rabbitmq с весенним amqp.
Я потребляю одно сообщение за раз, и оно довольно медленное, потому что я сохраняю его в БД. Так что открытие и закрытие транзакций каждый раз.
Сейчас я настроил такого потребителя.
@RabbitListener(queues = "queuename")
public void receive(Message message) {
someservice.saveToDb(message);
}
Но это действительно медленно. Я хотел бы использовать кучу сообщений, прежде чем начать их сохранение. Тогда я могу открыть транзакцию. Сохраните 300, а затем подтвердите и загрузите следующую партию.
Хотелось бы что-нибудь подобное?
class MessageChannelTag {
Message message;
Channel channel;
long tag;
}
@Component
class ConsumerClass {
List<MessageChannelTag> messagesToSave = new ArrayList<>();
@RabbitListener(queues = "queuename")
public void receive(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
messagesToSave.add(new MessageChannelTag(message, channel, tag));
}
@Scheduled(fixedDelay=500)
public void saveMessagesToDb() {
List saveTheese = new ArrayList(messagesToSave);
messagesToSave.clear();
service.saveMessages(saveTheese);
for(MessageChannelTag messageChannelTag:messagesToSave) {
//In the service I could mark the rows if save succeded or not and
//then out here I could ack or nack..
messageChannelTag.getChannel().basicAck(messageChannelTag.getTag(), false);
}
}
}
Или, если есть более простое решение, дайте мне знать. Я предпочитаю быстрый, простой и надежный =)