Spring AMQP, CorrelationId и GZipPostProcessor: UnsupportedEncodingException - PullRequest
0 голосов
/ 27 марта 2019

У меня есть проект с Spring AMQP (1.7.12.RELEASE). Если я помещаю значение в поле correlationId (etMessageProperties (). SetCorrelationId) и использую GZipPostProcessor, всегда возникает следующая ошибка:

"org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip"

Чтобы решить эту проблему, кажется, что она работает, используя следующий код:

DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
    messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
template.setMessagePropertiesConverter(messageConverter);

но я не знаю, какие последствия это будет иметь в действительности для клиентов, которые не используют Spring AMQP (я устанавливаю это поле, если оно есть в сообщении, которое мне пришло). Я прилагаю полный пример кода:

@Configuration
public class SimpleProducerGZIP 
{
    static final String queueName = "spring-boot";

    @Bean
    public CachingConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
        factory.setHost("localhost");
        factory.setAutomaticRecoveryEnabled(false);
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin ;
    }

    @Bean
    Queue queue() {
        Queue qr = new Queue(queueName, false);
        qr.setAdminsThatShouldDeclare(amqpAdmin());
        return qr;
    }

    @Bean 
    public RabbitTemplate rabbitTemplate() 
    {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setEncoding("gzip");
        template.setBeforePublishPostProcessors(new GZipPostProcessor());

         // TODO : 
        DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
        messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
        template.setMessagePropertiesConverter(messageConverter);

        return template;
    }

    public static void main(String[] args) 
    {
        @SuppressWarnings("resource")
        ApplicationContext context = new AnnotationConfigApplicationContext(SimpleProducerGZIP.class);
        RabbitTemplate _rabbitTemplate = context.getBean(RabbitTemplate.class);
        int contador = 0;
        try {
            while(true) 
            {
                contador = contador + 1;
                int _nContador = contador;
                System.out.println("\nInicio envio : " + _nContador);
                Object _o = new String(("New Message : " + contador));
                try
                {
                    _rabbitTemplate.convertAndSend(queueName, _o,
                            new MessagePostProcessor() {
                                @SuppressWarnings("deprecation")
                                @Override
                                public Message postProcessMessage(Message msg) throws AmqpException {
                                    if(_nContador%2 == 0) {
                                        System.out.println("\t--- msg.getMessageProperties().setCorrelationId ");
                                        msg.getMessageProperties().setCorrelationId("NewCorrelation".getBytes(StandardCharsets.UTF_8));
                                    }
                                    return msg;
                                }
                            }
                    );   
                    System.out.println("\tOK");
                }catch (Exception e) {
                    System.err.println("\t\tError en envio : " + contador + " - " + e.getMessage());
                }

                System.out.println("Fin envio : " + contador);
                Thread.sleep(500);
            }
        }catch (Exception e) {
            System.err.println("Exception : " + e.getMessage());
        }
    }
}

Вопрос в том, если я изменю конфигурацию rabbitTemplate так, чтобы ошибка не возникала, может ли это иметь значение для клиентов, которые используют Spring AMQP или другие альтернативы?

--- РЕДАКТИРОВАТЬ (28/03/2019) Это полная трассировка стека с кодом:

org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip
    at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:211)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:1531)
    at org.springframework.amqp.rabbit.core.RabbitTemplate$3.doInRabbit(RabbitTemplate.java:716)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1455)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:712)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:813)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:791)
    at es.jab.example.SimpleProducerGZIP.main(SimpleProducerGZIP.java:79)
Caused by: java.io.UnsupportedEncodingException: gzip
    at java.lang.StringCoding.decode(Unknown Source)
    at java.lang.String.<init>(Unknown Source)
    at java.lang.String.<init>(Unknown Source)
    at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:208)
    ... 8 more

1 Ответ

1 голос
/ 27 марта 2019

Мне было бы интересно посмотреть полную трассировку стека для получения дополнительной информации о проблеме.

Этот код был частью перехода от byte[] Id корреляции к String.Это было необходимо, чтобы избежать конвертации byte[]/String/byte[].

Если для политики установлено значение String, вам следует использовать свойство correlationIdString вместо correlationId.В противном случае correlationId не будет отображаться в исходящих сообщениях (в этом случае мы не смотрим на correlationId).Для входящих сообщений он определяет, какое свойство заполняется.

В 2.0 и более поздних версиях correlationId теперь является String вместо byte[], поэтому этот параметр больше не требуется.

РЕДАКТИРОВАТЬ

Теперь я вижу трассировку стека, это ...

template.setEncoding("gzip");

... неверно.

/**
 * The encoding to use when inter-converting between byte arrays and Strings in message properties.
 *
 * @param encoding the encoding to set
 */
public void setEncoding(String encoding) {
    this.encoding = encoding;
}

Естьнет таких Charset как gzip.Это свойство не имеет ничего общего с содержимым сообщения, оно просто используется при преобразовании byte[] в / из String.По умолчанию это UTF-8.

...