У меня есть приложение Spring Boot, и я хочу получать сообщения из нескольких очередей AWS SQS.Все эти очереди имеют свои собственные учетные данные (и, к сожалению, я ничего не могу с этим поделать).Ни один из этих учетных данных не может получить доступ к одной из других очередей, все они ограничены ровно одной очередью.
При наличии только одной очереди и учетных данных это просто.Мне просто нужно предоставить учетные данные как AWSCredentialsProvider
Bean и аннотировать мой метод с помощью @SqsListener
\ @EnableSqs
.
Но я не могу понять, как это сделать с несколькими учетными данными.
The *В аннотации 1009 * нет способа предоставить учетные данные, или предварительно сконфигурированный объект AmazonSqs
, или что-либо еще, что могло бы помочь.
Я искал способ сопоставить очередь с учетными данными, расширяя клиент CredentialsProvider
или AmazonSqs
, но безрезультатно.
Я даже пытался внедрить учетные данные в заголовокAmazonHttpClient, но это также было невозможно.
Я попытался создать все необходимое для прослушивания очереди SQS вручную.Но я застрял при создании MessageHandler для SimpleMessageListenerContainer
.
Требуемый QueueMessageHandler
работает только при создании в виде компонента с контекстом приложения.В противном случае он не будет искать методы, помеченные @SqsListener
.
К сожалению, единственные учебники или примеры, которые я смог найти, используют JMS, чего я хотел бы избежать, или просто используем аннотацию @SqsListener
только с одной очередью..
Есть ли другой способ предоставить разные учетные данные для нескольких очередей?
Мой тестовый код:
@Component
@Slf4j
public class TestOneQueueA {
public static final String QUEUE_A = "TestOneQueueA";
public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
AWSStaticCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
serviceInfo.getSecretAccessKey()));
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(serviceInfo.getRegion()).build();
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(client);
queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setResourceIdResolver(resourceIdResolver);
factory.setQueueMessageHandler(queueMessageHandler);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
}
@SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
}
}
Редактировать:
После попытки некоторыхБолее того, я смог внедрить своих клиентов AmazonSQS в два отдельных SimpleMessageListenerContainer
.Тогда проблема стала QueueMessageHandler
.
Если я создам его вручную, без контекста бина, он просто не будет искать никаких методов с аннотацией @SqsListener
.И нет никакого способа установить обработчики вручную.
Если я создам его как bean-компонент, он будет проверять каждый боб для аннотации.Таким образом, он также найдет метод очереди, которую он не должен искать.И тогда это завершится сбоем, потому что учетные данные не работают.
Я не могу найти способ создать QueueMessageHandler
только для одного метода SqsListener.
И SimpleMessageListenerContainer
не будет принимать ничего, кромеQueueMessageHandler
.