Я новичок в сервисной шине Azure, я разрабатываю пример приложения публикации / подписки с использованием Java SDK. В этом упражнении я хочу опубликовать 50 сообщений (json) в очередь в следующем формате.
[{ 'фамилия' = 'test0', 'success0' 'FirstName' =} ... 'LastName' = 'test49', 'success49' 'FirstName' =}]
В приложении-получателе 25-го сообщения я хочу, чтобы это сообщение было отменено, поэтому 25-е сообщение будет доступно в очереди. Но когда я запускаю приложение, все 49 сообщений завершены и одно сообщение отправлено на
очередь мертвых писем. Я получил ниже исключения.
Either receive link to 'asb.java.pub' closed with a transient error and reopened or the delivery was already settled by complete/abandon/defer/deadletter.
2019-05-09 06:17:19 ERROR MessageAndSessionPump:241 - Completing message with sequence number '5002' failed
java.lang.IllegalArgumentException: Delivery not found on the receive link.
at com.geico.messaging.servicebus.primitives.CoreMessageReceiver.generateDeliveryNotFoundException(CoreMessageReceiver.java:1319)
at com.geico.messaging.servicebus.primitives.CoreMessageReceiver.updateMessageStateAsync(CoreMessageReceiver.java:1161)
at com.geico.messaging.servicebus.primitives.CoreMessageReceiver.completeMessageAsync(CoreMessageReceiver.java:1046)
at com.geico.messaging.servicebus.MessageReceiver.lambda$3(MessageReceiver.java:267)
at java.util.concurrent.CompletableFuture.uniComposeStage(Unknown Source)
at java.util.concurrent.CompletableFuture.thenCompose(Unknown Source)
at com.geico.messaging.servicebus.MessageReceiver.completeAsync(MessageReceiver.java:262)
at com.geico.messaging.servicebus.MessageReceiver.completeAsync(MessageReceiver.java:255)
at com.geico.messaging.servicebus.MessageAndSessionPump.lambda$6(MessageAndSessionPump.java:220)
at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
. Вот мой код публикации
ConnectionStringBuilder connectionResolver = new ConnectionStringBuilder(connectionString, "asb.java.pub");
QueueClient sendClient = new QueueClient(connectionResolver,ReceiveMode.PEEKLOCK);
for(int i =0;i<50;i++){Message message = new Message("{'lastname' = 'test"+i+"', 'firstName' ='success"+i+"'}");
message.setLabel("name");sendClient.send(message);}sendClient.close();
вот мой код подписчика
IMessageHandler ih=new IMessageHandler() {
public CompletableFuture<Void> onMessageAsync(IMessage message) {
byte[] body = message.getBody();
Map map = GSON.fromJson(new String(body, UTF_8), Map.class);
try{ if(map.get("lastname").equals("test25")){ return receiveClient.abandonAsync(message.getLockToken());
;
}catch(Exception e){ e.printStackTrace();
}
CompletableFuture.completedFuture(null);
}
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
};
receiveClient.registerMessageHandler(ih, executorService);
ниже приведен скриншот после запуска приложения публикации введите описание изображения здесь
скриншот ниже после запуска приложения для подписчика. введите описание изображения здесь
У меня вопрос, почему сообщение переходит в тупик, когда я вызываю метод отмены. Не могу ли я сделать конкретное сообщение, чтобы отменить или автозаполнить false. Так что это сообщение будет доступно в очереди.
Спасибо.