У меня ServiceBus в azure с двумя очередями: первая - это когда клиент отправляет сообщения (мне нужно перехватить сообщения. QUEUE_IN
), а вторая - очередь, в которую клиент получает сообщения, которые я отправлено, когда я заканчиваю sh для обработки сообщений в первом (QUEUE_OUT
).
Мне нужно все время слушать QUEUE_IN
, и когда они отправили сообщение, обработали его и отправили сообщение на QUEUE_OUT
с результатом.
Проблема, с которой я столкнулся, заключается в том, что я не могу перехватить сообщения в QUEUE_IN
, когда я запускаю программу, я знаю, что в очереди есть сообщения, но программа не может их видеть.
private void recibirMensaje() throws Exception {
QueueClient queueIn= new QueueClient(new ConnectionStringBuilder(stringConection, nameQueue),
ReceiveMode.PEEKLOCK);
ExecutorService executorService = Executors.newSingleThreadExecutor();
this.interceptMessage(queueIn, executorService);
recibirMensajeCliente.close();
executorService.shutdown();
}
private void interceptMessage(QueueClient queueIn, ExecutorService executorService) throws Exception {
queueIn.registerMessageHandler(new IMessageHandler() {
public CompletableFuture<Void> onMessageAsync(IMessage message) {
if (message.getLabel() != null && message.getContentType() != null
&& message.getLabel().contentEquals(etiquetaMensajes)
&& message.getContentType().contentEquals("application/json")) {
byte[] body = message.getBody();
logger.info("Printing message{}", body);
}
return CompletableFuture.completedFuture(null);
}
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, true, Duration.ofMinutes(1)), executorService);
}
Я не публикую метод отправки, потому что у меня нет проблем с ним. Я не знаю, как постоянно слушать queue_in
и обрабатывать сообщения или как узнать, есть ли в очереди сообщения для обработки и как их перехватить.