Клиент Java RabbitMQ зависает при повторной отправке через поток обратного вызова коммитов производителя после nack из-за несуществующего обмена - PullRequest
0 голосов
/ 29 мая 2018

В настоящее время я экспериментирую со сценариями сбоев, которые могут возникнуть при обмене данными через посредник сообщений RabbitMQ.Цель состоит в том, чтобы оценить, как такое взаимодействие можно сделать более устойчивым.

В частности, я хочу вызвать подтверждение отсутствия (не-подтверждения) при отправке сообщений в режиме фиксации производителя .Для этого я отправляю сообщение на несуществующий обмен через Spring AMQP RabbitTemplate.send.В обратном вызове, предоставленном через RabbitTemplate.setConfirmCallback, я затем обрабатываю ack=false подтверждений, отправляя сообщение на существующий обмен (имитируя, что я позаботился о причине отсутствия ответа).

Пример класса и связанный с ним тест приведены ниже, полный пример проекта можно найти в моем репозитории github .Я использую RabbitMQ 3.6 и Spring Boot / AMQP 2.0.2.

При запуске теста обратный вызов вызывается с ack=false, как и ожидалось.Однако повторная отправка сообщения зависает при повторном создании канала (с исключением тайм-аута через 10 минут).Дамп стека вызовов и журналы приведены ниже.

Решение проблемы, по-видимому, заключается в отправке сообщения в другой ветке, как это предлагается здесь .Если вы раскомментируете строку service.runInSeparateThread = true; в тесте, все сработает!

Однако я не совсем понимаю, почему что-то не работает, и я нигде не читал об этой практике, кроме как вышеупомянутый пост.Это ожидаемое поведение или ошибка?Может кто-нибудь объяснить детали?

Большое спасибо за ваш совет!

Снимок стека вызовов :

 "AMQP Connection 127.0.0.1:5672@3968" prio=5 tid=0xe nid=NA waiting
 java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:73)
  at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
  at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
  at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
  at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
  at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133)
  at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176)
  at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:542)
  at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:57)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1156)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1144)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:585)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:568)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:538)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:520)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161)
  at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803)
  at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771)
  at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:859)
  ...

Журналы:

...
10:21:24.613 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitAdmin - declaring Exchange 'ExistentExchange'
10:21:24.630 [main] INFO com.example.rabbitmq.ProducerService - sending `initial Message`
10:21:24.648 [main] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate$MockitoMock$952329793@562c877a
10:21:24.648 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@3013909b Shared Rabbit Connection: SimpleConnection@12db3386 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 1341] to map, size now 1
10:21:24.649 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$175/1694519286 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@3013909b Shared Rabbit Connection: SimpleConnection@12db3386 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 1341]
10:21:24.649 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'[B@67001148(byte[15])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [nonExistentExchange], routingKey = [nonExistentQueue]
10:21:24.659 [main] INFO com.example.rabbitmq.ProducerService - done with sending message
10:21:24.675 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) PC:Nack:(close):1
10:21:24.677 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - Sending confirm PendingConfirm [correlationData=null cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'nonExistentExchange' in vhost '/', class-id=60, method-id=40)]
10:21:24.677 [AMQP Connection 127.0.0.1:5672] INFO com.example.rabbitmq.ProducerService - In confirm callback, ack=false, cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'nonExistentExchange' in vhost '/', class-id=60, method-id=40), correlationData=null
10:21:24.677 [AMQP Connection 127.0.0.1:5672] INFO com.example.rabbitmq.ProducerService - sending `resend Message`
10:21:24.678 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) PC:Nack:(close):1
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - AMQChannel(amqp://guest@127.0.0.1:5672/,1) No listener for seq:1
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Removed publisher confirm channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) from map, size now 0
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Removed publisher confirm channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) from map, size now 0
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PendingConfirms cleared 

ProducerService :

@Service
public class ProducerService {

    static final String EXISTENT_EXCHANGE = "ExistentExchange";
    private static final String NON_EXISTENT_EXCHANGE = "nonExistentExchange";
    private static final String QUEUE_NAME = "nonExistentQueue";
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final RabbitTemplate rabbitTemplate;
    private final Executor executor = Executors.newCachedThreadPool();
    boolean runInSeparateThread = false;

    public ProducerService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this::confirmCallback);
    }

    private void confirmCallback(CorrelationData correlationData, boolean ack, String cause) {
        logger.info("In confirm callback, ack={}, cause={}, correlationData={}", ack, cause, correlationData);
        if (!ack) {
            if (runInSeparateThread) {
                executor.execute(() -> sendMessage("resend Message", EXISTENT_EXCHANGE));
            } else {
                sendMessage("resend Message", EXISTENT_EXCHANGE);
            }
        } else {
            logger.info("sending was acknowledged");
        }
    }

    public void produceMessage() {
        sendMessage("initial Message", NON_EXISTENT_EXCHANGE);
    }

    private void sendMessage(String messageBody, String exchangeName) {
        logger.info("sending `{}`", messageBody);
        rabbitTemplate.send(exchangeName, QUEUE_NAME, new Message(messageBody.getBytes(), new MessageProperties()));
        logger.info("done with sending message");
    }

}

ProducerServiceTest :

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {RabbitAutoConfiguration.class, ProducerService.class})
@DirtiesContext
public class ProducerServiceTest {

    @Autowired
    private ProducerService service;
    @SpyBean
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private AmqpAdmin amqpAdmin;
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Before
    public void setup() {
        cachingConnectionFactory.setPublisherConfirms(true);
        amqpAdmin.declareExchange(new DirectExchange(ProducerService.EXISTENT_EXCHANGE));
    }

    @After
    public void cleanup() {
        amqpAdmin.deleteExchange(ProducerService.EXISTENT_EXCHANGE);
    }

    @Test
    public void sendMessageToNonexistentExchange() throws InterruptedException {
        final CountDownLatch sentMessagesLatch = new CountDownLatch(2);
        final List<Message> sentMessages = new ArrayList<>();
        doAnswer(invocation -> {
            invocation.callRealMethod();
            sentMessages.add(invocation.getArgument(2));
            sentMessagesLatch.countDown();
            return null;
        }).when(rabbitTemplate).send(anyString(), anyString(), any(Message.class));

//        service.runInSeparateThread = true;
        service.produceMessage();
        sentMessagesLatch.await();

        List<String> messageBodies = sentMessages.stream().map(message -> new String(message.getBody())).collect(toList());
        assertThat(messageBodies, equalTo(Arrays.asList("initial Message", "resend Message")));
    }

}

1 Ответ

0 голосов
/ 29 мая 2018

Полагаю, это можно считать ошибкой, но это артефакт способа кеширования каналов для повышения производительности.Проблема состоит в том, что попытка публикации на канале в том же потоке, который доставляет подтверждение для того же канала, вызывает тупик в клиентской библиотеке.

У нас есть открытая проблема , чтобы рассмотретьрешение (по другой причине);мы просто не дошли до этого.AFAIK, вы только второй пользователь, который сделал это более чем за 6 лет, так как мы добавили поддержку для подтверждений и возвратов.

EDIT

На самом деле, это другоеситуация;канал не используется повторно, так как канал закрыт.Он пытается создать новый канал, и это то, что зашло в тупик.Я не понимаю, как мы (Spring AMQP) можем что-либо сделать;это ограничение Java-клиента;Вы не можете выполнять операции в потоке ack.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...