Миграция SqsConsumer с пружины-aws - с весны на интеграцию- aws - PullRequest
1 голос
/ 08 апреля 2020

Я хочу использовать spring -gration- aws для отправки сообщений на AWS SNS и для получения сообщений от AWS SQS. У меня возникают некоторые проблемы с пониманием того, как перенести мое приложение из-под Spring-coud- aws -позволяет использовать его. Мой код в основном является классом конфигурации SQS:

@Configuration
@EnableConfigurationProperties(SqsProperties.class)
@Profile("!test")
public class SqsConfiguration {

     private final SqsProperties sqsProperties;  

     @Autowired  
     public SqsConfiguration(SqsProperties sqsProperties) {  
          this.sqsProperties = sqsProperties;  
     }  

     @Bean  
     public SimpleMessageListenerContainer simpleMessageListenerContainer() {  
          SimpleMessageListenerContainer msgListenerContainer =  
                simpleMessageListenerContainerFactory().createSimpleMessageListenerContainer();  
          msgListenerContainer.setMessageHandler(queueMessageHandler());  
          return msgListenerContainer;  
     }  

     @Bean  
     public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {  
          SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();  
      msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());  
      msgListenerContainerFactory.setDestinationResolver(customDestinationResolver(sqsProperties));  
     return msgListenerContainerFactory;  
      }  

        @Bean  
      public QueueMessageHandler queueMessageHandler() {  
            QueueMessageHandlerFactory queueMsgHandlerFactory = new QueueMessageHandlerFactory();  
      queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());  
     return queueMsgHandlerFactory.createQueueMessageHandler();  
      }  


        @Bean(name = "amazonSQS", destroyMethod = "shutdown")  
        public AmazonSQSAsync amazonSQSClient() {  
            return AmazonSQSAsyncClientBuilder.defaultClient();  
      }  

        @Bean  
      public SqsQueuesDestinationResolver customDestinationResolver(SqsProperties sqsProperties) {  
            return new SqsQueuesDestinationResolver(sqsProperties);  
      }  
    }

, который использует customDestinationResolver (в основном потому, что имена очередей могут быть динамическими c только из-за различных развертываний), определяемыми как:

    public class SqsQueuesDestinationResolver implements DestinationResolver<String> {  

        private static final Logger LOG = LoggerFactory.getLogger(SqsQueuesDestinationResolver.class);  

     private SqsProperties sqsProperties;  

      @Autowired  
      private AmazonSQSAsync amazonSQSAsync;  

     public SqsQueuesDestinationResolver(SqsProperties sqsProperties) {  
            this.sqsProperties = sqsProperties;  
      }  

        @Override  
      public String resolveDestination(String queueName) throws DestinationResolutionException {  
            String finalQueueName = getFinalQueueName(queueName);  
     try {  
                return amazonSQSAsync.getQueueUrl(finalQueueName).getQueueUrl();  
      }  
            catch (QueueDoesNotExistException queueDoesNotExistException) {  
                LOG.error(String.format("The '%s' queue was not found.", finalQueueName));  
      }  
            return null;  
      }  

        private String getFinalQueueName(String queueName) {  
            String finalQueueName;  
     switch (queueName) {  
                case "queue-foo":  
                    finalQueueName = sqsProperties.getFooQueue();  
     break; case "queue-bar":  
                    finalQueueName = sqsProperties.getBarQueue();  
     break;  
     default:  
                    finalQueueName = null;  
      }  
            return finalQueueName;  
      }  
    }

И это все: в основном с этой конфигурацией мне просто нужно использовать аннотацию @SqsListener("foo-queue") или @SqsListener("bar-queue") в подходящем потребительском методе

@SqsListener("foo-queue")
public void listen(String message){
     processMessage(message);
}

Я пытался следовать документации в https://github.com/spring-projects/spring-integration-aws#spring -integrations-extensions-to- aws и то, что я пытаюсь понять в части SQS, в главе «Адаптер входящего канала», какие очереди следует указывать в качестве аргумента для конструктора SqsMessageDrivenChannelAdapter () так как я использую пользовательский destinationResolver и как именно я получаю сообщения или если он должен работать с аннотацией @SqsListener из Spring-cloud- aws -messaging, как и раньше.

Tks много для помощь, и если это не то место, чтобы спросить, или если это очень тупой вопрос, извините, я просто пытаюсь его впервые:)

1 Ответ

0 голосов
/ 08 апреля 2020

Я не уверен, что вам не хватает, но вы все еще можете использовать свои foo-queue и bar-queue в SqsMessageDrivenChannelAdapter(AmazonSQSAsync amazonSqs, String... queues) ctor.

Ваш пользовательский DestinationResolver действительно может быть использован там а также:

public void setDestinationResolver(DestinationResolver<String> destinationResolver) {
    this.simpleMessageListenerContainerFactory.setDestinationResolver(destinationResolver);
}

На самом деле этот SqsMessageDrivenChannelAdapter полностью основан на том же SimpleMessageListenerContainerFactory.

Чтобы использовать сообщения, полученные от SQS, вам просто нужно настроить этот адаптер канала с MessageChannel для создания сообщения. Затем вы подписываетесь на этот канал в нисходящем направлении.

Подробнее о каналах и сообщениях см. В Справочном руководстве по интеграции Spring: https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/index.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...