У меня есть очередь в RabbitMq, которая подписывается на обмен с результатами некоторых вычислений.Одно вычисление может дать несколько результатов, и я хочу обрабатывать сообщения, содержащие результаты, принадлежащие одному вычислению, либо все, либо ни одного из них.В случае сбоя службы никто не получает ACK, поэтому он будет помещен в очередь и обработан другим экземпляром службы.
Мой потребитель:
class ResultConsumer(channel: Channel,
private val resultListener: ResultListener)
: DefaultConsumer(channel) {
override fun handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: ByteArray) {
val result = String(body)
// ....
if (status != null && status.equals("success")) {
if (allSubresultsDelivered(result)) {
channel.basicAck(envelope.deliveryTag, true)
}
return
}
}
}
Но это такне работает, потому что потребитель не получает никакого другого сообщения, пока не подтвердит (или отклонит) предыдущее сообщение.
Что не так с моим кодом или моим пониманием?