Обработка сообщения ответа в AsyncRabbitTemplate - PullRequest
1 голос
/ 06 марта 2019

У меня проблемы с обработкой сообщений GZip / GUnzip при использовании AsyncRabbitTemplate.

Все работает нормально с синхронной настройкой шаблона, например так:

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(jsonConverter());
    rabbitTemplate.setReplyTimeout(config.getRabbitSendAndReceiveReplyTimeout());
    rabbitTemplate.setReceiveTimeout(config.getRabbitSendAndReceiveReceiveTimeout());
    rabbitTemplate.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
    rabbitTemplate.setBeforePublishPostProcessors(new GZipPostProcessor(true));
    return rabbitTemplate;
}

Однако, когда янастроить асинхронный шаблон следующим образом:

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public AsyncRabbitTemplate rabbitTemplateAsync(final ConnectionFactory connectionFactory) {
    final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory));
    // need to manually start the reply listener container for some reason
    asyncRabbitTemplate.start();
    return asyncRabbitTemplate;
}

Ответное сообщение не распаковывается должным образом, и я получаю это сообщение об ошибке

Caused by: java.io.UnsupportedEncodingException: gzip:UTF-8
    at java.lang.StringCoding.decode(Unknown Source) ~[?:1.8.0_192]
    at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
    at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:235) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:199) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.AsyncRabbitTemplate.onMessage(AsyncRabbitTemplate.java:576) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]

Я пытался дать AsyncRabbitTemplate настроенный DirectReplyToMessageListenerContainer, но это не помогает

    final DirectReplyToMessageListenerContainer directReplyToMessageListenerContainer = new DirectReplyToMessageListenerContainer(
            connectionFactory);
    directReplyToMessageListenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
    final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
            directReplyToMessageListenerContainer);

Это просто приводит к этой ошибке:

[ОШИБКА] 2019-03-06 12: 18: 05.192 [Соединение AMQP 172.17.3.6: 5672] CachingConnectionFactory.log - отключение канала: ошибка канала;Метод протокола: #method (код ответа = 406, текст ответа = PRECONDITION_FAILED - потребитель быстрого ответа не существует, идентификатор класса = 60, идентификатор метода = 40)

Обратите внимание, что я былв состоянии заставить все работать, взяв ветку проекта spring-rabbit и добавив этот конструктор в AsyncRabbitTemplate:

public IndigoAsyncRabbitTemplate(final RabbitTemplate template,
        final DirectReplyToMessageListenerContainer directReplyToContainer) {
    Assert.notNull(template, "'template' cannot be null");
    this.template = template;
    container = null;
    replyAddress = null;
    this.directReplyToContainer = directReplyToContainer;
    directReplyToContainer.setMessageListener(this);
}

Итак, нужно ли усовершенствовать библиотеку spring rabbit, чтобы она начала работать?Или есть способ заставить GUnzip работать с прослушивателем ответов, не перепрыгивая через слишком много обручей?

1 Ответ

0 голосов
/ 06 марта 2019

Правильно, это должно пойти как улучшение в структуре.Мы просто пропускаем факт afterReceivePostProcessors в случае AsyncRabbitTemplate.Мы можем переконфигурировать внутренний DirectReplyToMessageListenerContainer для использования afterReceivePostProcessors из предоставленного RabbitTemplate.

. Тем временем вы можете придерживаться обычной SimpleMessageListenerContainer инъекции.Или вы можете попробовать с помощью внешней DirectReplyToMessageListenerContainer инъекции.

См. Этот ctor:

/**
 * Construct an instance using the provided arguments. The first queue the container
 * is configured to listen to will be used as the reply queue. Replies will be
 * routed using the default exchange with that queue name as the routing key.
 * @param template a {@link RabbitTemplate}
 * @param container a {@link AbstractMessageListenerContainer}.
 */
public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {
    this(template, container, null);
}

Проблема по этому вопросу: https://github.com/spring-projects/spring-amqp/issues/920

...