Выпуск тестирования весеннего облака SQS Listener - PullRequest
0 голосов
/ 01 июня 2018

Среда

  • Spring Boot: 1.5.13.RELEASE
  • Облако: Edgware.SR3
  • Облако AWS: 1.2.2.RELEASE
  • Java 8
  • OSX 10.13.4

Проблема

Я пытаюсь написать интеграционный тест для SQS.

У меня естьлокально работающий localstack докер-контейнер с SQS, работающим на TCP/4576

В моем тестовом коде я определяю SQS-клиента с конечной точкой, установленной на локальный 4576, и могу успешно подключиться и создать очередь, отправитьсообщение и удалить очередь.Я также могу использовать клиент SQS для получения сообщений и получения отправленного мной сообщения.

Моя проблема в том, что если я удаляю код, который получает сообщение вручную, чтобы позволить другому компоненту получить сообщение, то, похоже, ничего не происходит.У меня есть пружинный компонент, аннотированный следующим образом:

Слушатель

@Component
public class MyListener {
@SqsListener(value = "my_queue", deletionPolicy = ON_SUCCESS)
    public void receive(final MyMsg msg) {
        System.out.println("GOT THE MESSAGE: "+ msg.toString());
    }
}

Test

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.profiles.active=test")
public class MyTest {

    @Autowired
    private AmazonSQSAsync amazonSQS;

    @Autowired
    private SimpleMessageListenerContainer container;

    private String queueUrl;

    @Before
    public void setUp() {
        queueUrl = amazonSQS.createQueue("my_queue").getQueueUrl();
    }

    @After
    public void tearDown() {
        amazonSQS.deleteQueue(queueUrl);
    }

    @Test
    public void name() throws InterruptedException {
        amazonSQS.sendMessage(new SendMessageRequest(queueUrl, "hello"));
        System.out.println("isRunning:" + container.isRunning());
        System.out.println("isActive:" + container.isActive());
        System.out.println("isRunningOnQueue:" + container.isRunning("my_queue"));
        Thread.sleep(30_000);
        System.out.println("GOT MESSAGE: " + amazonSQS.receiveMessage(queueUrl).getMessages().size());
    }

    @TestConfiguration
    @EnableSqs
    public static class SQSConfiguration {

        @Primary
        @Bean(destroyMethod = "shutdown")
        public AmazonSQSAsync amazonSQS() {
            final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://127.0.0.1:4576", "eu-west-1");
            return new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                    .standard()
                    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("key", "secret")))
                    .withEndpointConfiguration(endpoint)
                    .build());
        }
    }
}

В журналах испытаний я вижу:

oscamlistener.QueueMessageHandler: 1 методов обработки сообщений, найденных в классе MyListener: {public void MyListener.receive (MyMsg) =org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd408231 22:50 201-05-05: 39.582 ИНФОРМАЦИЯ 16329 ---

oscamlistener.QueueMessageHandler: Mapped "org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a" на общедоступный void *L * * 1039 MyMistens1040 *

Далее:

isRunning: true

isActive: true

isRunningOnQueue: false

ПОЛУЧЕННОЕ СООБЩЕНИЕ: 1

Это показывает, что за 30 секундную паузу между отправкой сообщения контейнер не забрал его, и когда я вручную опрашиваю сообщение, оно находится в очереди, и я могу его использовать.

Мойвопрос в том, почему не вызывается слушатель и почему строка isRunningOnQueue:false указывает на то, что он не запускается автоматически для этой очереди?

Обратите внимание, что я также попытался установить свой собственный компонент SimpleMessageListenerContainer с автоматическим запуском, установленным в значение true (все равно по умолчанию), и не заметил никаких изменений в поведении.Я подумал, что org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration#simpleMessageListenerContainer, установленный @EnableSqs, должен настроить автозапуск SimpleMessageListenerContainer, который должен опрашивать меня.

Я также установил

logging.level.org.apache.http=DEBUG
logging.level.org.springframework.cloud=DEBUG

в моих свойствах теста я вижу, что HTTP-вызовы создают очередь, отправляют сообщение, удаляют и т. д., но нет HTTP-вызовов (кроме моего ручного в конце теста).

1 Ответ

0 голосов
/ 02 июня 2018

Я понял это после некоторой обработки.

Даже если фабрика контейнеров простых сообщений настроена на автоматический запуск, она все равно выполняет инициализацию, которая включает определение наличия очереди.

В этом случае очередь создается в моем тесте в методе настройки - но, к сожалению, это происходит после настройки контекста пружины, что означает, что возникает исключение.

Я исправил это, просто переместивсоздание очереди для создания контекста клиента SQS (что происходит до создания контейнера сообщений).то есть:

@Bean(destroyMethod = "shutdown")
        public AmazonSQSAsync amazonSQS() {
            final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://localhost:4576", "eu-west-1");
            final AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                    .standard()
                    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummyKey", "dummySecret")))
                    .withEndpointConfiguration(endpoint)
                    .build());
            client.createQueue("test-queue");
            return client;
        }
...