Среда
- Spring Boot: 1.5.13.RELEASE
- Облако: Edgware.SR3
- Облако AWS: 1.2.2.RELEASE
- Java 8
- OSX 10.13.4
Проблема
Я пытаюсь написать интеграционный тест для SQS.
У меня естьлокально работающий localstack докер-контейнер с SQS, работающим на TCP/4576
В моем тестовом коде я определяю SQS-клиента с конечной точкой, установленной на локальный 4576, и могу успешно подключиться и создать очередь, отправитьсообщение и удалить очередь.Я также могу использовать клиент SQS для получения сообщений и получения отправленного мной сообщения.
Моя проблема в том, что если я удаляю код, который получает сообщение вручную, чтобы позволить другому компоненту получить сообщение, то, похоже, ничего не происходит.У меня есть пружинный компонент, аннотированный следующим образом:
Слушатель
@Component
public class MyListener {
@SqsListener(value = "my_queue", deletionPolicy = ON_SUCCESS)
public void receive(final MyMsg msg) {
System.out.println("GOT THE MESSAGE: "+ msg.toString());
}
}
Test
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.profiles.active=test")
public class MyTest {
@Autowired
private AmazonSQSAsync amazonSQS;
@Autowired
private SimpleMessageListenerContainer container;
private String queueUrl;
@Before
public void setUp() {
queueUrl = amazonSQS.createQueue("my_queue").getQueueUrl();
}
@After
public void tearDown() {
amazonSQS.deleteQueue(queueUrl);
}
@Test
public void name() throws InterruptedException {
amazonSQS.sendMessage(new SendMessageRequest(queueUrl, "hello"));
System.out.println("isRunning:" + container.isRunning());
System.out.println("isActive:" + container.isActive());
System.out.println("isRunningOnQueue:" + container.isRunning("my_queue"));
Thread.sleep(30_000);
System.out.println("GOT MESSAGE: " + amazonSQS.receiveMessage(queueUrl).getMessages().size());
}
@TestConfiguration
@EnableSqs
public static class SQSConfiguration {
@Primary
@Bean(destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQS() {
final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://127.0.0.1:4576", "eu-west-1");
return new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("key", "secret")))
.withEndpointConfiguration(endpoint)
.build());
}
}
}
В журналах испытаний я вижу:
oscamlistener.QueueMessageHandler: 1 методов обработки сообщений, найденных в классе MyListener: {public void MyListener.receive (MyMsg) =org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd408231 22:50 201-05-05: 39.582 ИНФОРМАЦИЯ 16329 ---
oscamlistener.QueueMessageHandler: Mapped "org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a" на общедоступный void *L * * 1039 MyMistens1040 *
Далее:
isRunning: true
isActive: true
isRunningOnQueue: false
ПОЛУЧЕННОЕ СООБЩЕНИЕ: 1
Это показывает, что за 30 секундную паузу между отправкой сообщения контейнер не забрал его, и когда я вручную опрашиваю сообщение, оно находится в очереди, и я могу его использовать.
Мойвопрос в том, почему не вызывается слушатель и почему строка isRunningOnQueue:false
указывает на то, что он не запускается автоматически для этой очереди?
Обратите внимание, что я также попытался установить свой собственный компонент SimpleMessageListenerContainer
с автоматическим запуском, установленным в значение true (все равно по умолчанию), и не заметил никаких изменений в поведении.Я подумал, что org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration#simpleMessageListenerContainer
, установленный @EnableSqs
, должен настроить автозапуск SimpleMessageListenerContainer
, который должен опрашивать меня.
Я также установил
logging.level.org.apache.http=DEBUG
logging.level.org.springframework.cloud=DEBUG
в моих свойствах теста я вижу, что HTTP-вызовы создают очередь, отправляют сообщение, удаляют и т. д., но нет HTTP-вызовов (кроме моего ручного в конце теста).