Я использую amqp клиент 5.5.1 в своем коде java. Я объявил очередь, используя аргумент приоритета:
RMQMessageHandler.declareQueue()---> method
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
channel.queueDeclare(rmqQueueName, true, false, false, args);
И добавил потребителя для использования сообщений очереди:
public void consumeBrokerMessage(final String rmqQueueName) throws IOException {
declareQueue(rmqQueueName);
log.info(" [*] Waiting for messages. To exit press CTRL+C");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
final String consumerTag,
final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body) throws IOException {
boolean flag = false;
final String brokerMessage = new String(body, StandardCharsets.UTF_8);
if (null != properties.getPriority()) {
//setting value in pojo object
} else {
//setting 0 value in pojo object
}
//logic
final long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
};
channel.basicQos(1);
channel.basicConsume(rmqQueueName, false, consumer);
}
Когда я выполняю свой код, но получаю исключение ниже:
java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
at com.springernature.acdc.transformation.rmq.RMQMessageHandler.declareQueue(RMQMessageHandler.java:103)
at com.springernature.acdc.transformation.rmq.RMQMessageHandler.consumeBrokerMessage(RMQMessageHandler.java:49)
at com.springernature.acdc.transformation.ACDCTransformationEngine.start(ACDCTransformationEngine.java:28)
at com.springernature.acdc.transformation.ACDCTransformationMain.main(ACDCTransformationMain.java:17)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-priority' for queue 'acdc-transformation' in vhost '736e9777-65f3-499d-a94e-abc9e3b91135': received the value '10' of type 'signedint' but current is none, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:499)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:292)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 6 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-priority' for queue 'acdc-transformation' in vhost '736e9777-65f3-499d-a94e-abc9e3b91135': received the value '10' of type 'signedint' but current is none, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
at java.lang.Thread.run(Thread.java:748)
Не могли бы вы сообщить мне, где я делаю неправильно в моем коде?