aws sns to sqs, сообщение не конвертируется - PullRequest
0 голосов
/ 03 сентября 2018

Я пытаюсь настроить производителя SNS с потребителем SQS, созданного с помощью загрузочного приложения Spring. Я попытался использовать облако Spring и просто использовать JMS API, и мне кажется, что я не могу заставить моего потребителя преобразовать сообщение для чтения. В Spring cloud я получаю сообщение об ошибке, в котором говорится, что «полезная нагрузка не является действительным уведомлением. Код ниже: любая помощь очень ценится

Конфигурация SNS:

@Bean
public AmazonSNS snsClient() {
     AmazonSNS snsClient = AmazonSNSClientBuilder.standard()
          .withEndpointConfiguration(<endpoint-stuff)).build();
    return snsClient;
}

@Bean
public NotificationMessagingTemplate notificationMessagingTemplate(
        AmazonSNS amazonSNS) {
    return new NotificationMessagingTemplate(amazonSNS);
}

код производителя:

@Autowired
NotificationMessagingTemplate messagingTemplate;

public void send() throws Exception {

    MessageDto message = new MessageDto();
    message.setMessageA("Hello");
    message.setMessageB("Again");

    messagingTemplate.sendNotification("my_topic",message,"Test");    
}

SQS потребительская конфигурация:

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() 
 {
    SimpleMessageListenerContainer msgListenerContainer = 
     simpleMessageListenerContainerFactory()
            .createSimpleMessageListenerContainer();
    msgListenerContainer.setMessageHandler(queueMessageHandler());
    return msgListenerContainer;
}

@Bean
public SimpleMessageListenerContainerFactory 
    simpleMessageListenerContainerFactory() {
    SimpleMessageListenerContainerFactory msgListenerContainerFactory = 
    new SimpleMessageListenerContainerFactory();
    msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());
    return msgListenerContainerFactory;
}


@Bean
public QueueMessageHandler queueMessageHandler() {
    QueueMessageHandlerFactory queueMsgHandlerFactory = new 
    QueueMessageHandlerFactory();
    queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());
    QueueMessageHandler queueMessageHandler = 
      queueMsgHandlerFactory.createQueueMessageHandler();
    //List<HandlerMethodArgumentResolver> list = new ArrayList<>();
    //HandlerMethodArgumentResolver resolver = new 
    //PayloadArgumentResolver(new MappingJackson2MessageConverter());
    //list.add(resolver);
    //queueMessageHandler.setArgumentResolvers(list);
    return queueMessageHandler;


}




@Lazy
@Bean(name = "amazonSQS", destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQSClient() {
    AwsClientBuilder.EndpointConfiguration endpointConfiguration = new 
    AwsClientBuilder.EndpointConfiguration(
            "<url>, "<region");
    AmazonSQSAsync awsSQSAsync = 

    AmazonSQSAsyncClientBuilder.standard()
    .withEndpointConfiguration(endpointConfiguration)
           .build();
    return awsSQSAsync;
}

SQS слушатель:

@SqsListener(value = "my_queue", deletionPolicy = 
 SqsMessageDeletionPolicy.ON_SUCCESS)
public void consume(final @NotificationMessage MessageDto event) {
    LOGGER.info("Message received {} 
         {}",event.getMessageA(),event.getMessageB());
}

трассировка стека:

Caused by: 
org.springframework.messaging.converter.MessageConversionException: 
Payload: '<payload here>' is not a valid notification
at 
org.springframework.cloud.aws.messaging.support.
converter.NotificationRequestConverter.fromMessage
(NotificationRequestConverter.java:69)
at
...