Несколько очередей AWS SQS с разными учетными данными в Spring Boot - PullRequest
2 голосов
/ 23 сентября 2019

У меня есть приложение Spring Boot, и я хочу получать сообщения из нескольких очередей AWS SQS.Все эти очереди имеют свои собственные учетные данные (и, к сожалению, я ничего не могу с этим поделать).Ни один из этих учетных данных не может получить доступ к одной из других очередей, все они ограничены ровно одной очередью.

При наличии только одной очереди и учетных данных это просто.Мне просто нужно предоставить учетные данные как AWSCredentialsProvider Bean и аннотировать мой метод с помощью @SqsListener \ @EnableSqs.
Но я не могу понять, как это сделать с несколькими учетными данными.

The *В аннотации 1009 * нет способа предоставить учетные данные, или предварительно сконфигурированный объект AmazonSqs, или что-либо еще, что могло бы помочь.

Я искал способ сопоставить очередь с учетными данными, расширяя клиент CredentialsProvider или AmazonSqs, но безрезультатно.
Я даже пытался внедрить учетные данные в заголовокAmazonHttpClient, но это также было невозможно.

Я попытался создать все необходимое для прослушивания очереди SQS вручную.Но я застрял при создании MessageHandler для SimpleMessageListenerContainer.
Требуемый QueueMessageHandler работает только при создании в виде компонента с контекстом приложения.В противном случае он не будет искать методы, помеченные @SqsListener.
К сожалению, единственные учебники или примеры, которые я смог найти, используют JMS, чего я хотел бы избежать, или просто используем аннотацию @SqsListener только с одной очередью..

Есть ли другой способ предоставить разные учетные данные для нескольких очередей?

Мой тестовый код:

@Component
@Slf4j
public class TestOneQueueA {

  public static final String QUEUE_A = "TestOneQueueA";

  public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
    SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
    AWSStaticCredentialsProvider credentialsProvider =
        new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
            serviceInfo.getSecretAccessKey()));

    AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withRegion(serviceInfo.getRegion()).build();

    QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
    queueMessageHandlerFactory.setAmazonSqs(client);
    queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));

    QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
    queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(client);
    factory.setResourceIdResolver(resourceIdResolver);
    factory.setQueueMessageHandler(queueMessageHandler);

    SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    try {
      simpleMessageListenerContainer.afterPropertiesSet();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    simpleMessageListenerContainer.start();
    simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
  }

  @SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
    log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
  }
}

Редактировать:

После попытки некоторыхБолее того, я смог внедрить своих клиентов AmazonSQS в два отдельных SimpleMessageListenerContainer.Тогда проблема стала QueueMessageHandler.

Если я создам его вручную, без контекста бина, он просто не будет искать никаких методов с аннотацией @SqsListener.И нет никакого способа установить обработчики вручную.
Если я создам его как bean-компонент, он будет проверять каждый боб для аннотации.Таким образом, он также найдет метод очереди, которую он не должен искать.И тогда это завершится сбоем, потому что учетные данные не работают.
Я не могу найти способ создать QueueMessageHandler только для одного метода SqsListener.
И SimpleMessageListenerContainer не будет принимать ничего, кромеQueueMessageHandler.

Ответы [ 2 ]

1 голос
/ 23 сентября 2019

Вы можете объявить различные @Bean s для учетных записей, которые вы хотите использовать с пользовательскими @Qualifier.Допустим, у вас есть две очереди SQS в двух разных аккаунтах.Затем объявляйте два боба типа AmazonSQS.

@Qualifier("first")
@Bean public AmazonSQS amazonSQSClient() {
    return AmazonSQSClient.builder()
            .withCredentials(credentialsProvider())
            .withRegion(Regions.US_EAST_1)
            .build();
}

@Qualifier("second")
@Bean public AmazonSQS amazonSQSClient() {
    return AmazonSQSClient.builder()
            .withCredentials(anotherCredentialsProvider())
            .withRegion(Regions.US_EAST_1)
            .build();
}

. Затем вы можете @Autowired их.

@Autowired @Qualifier("second") private AmazonSQS sqsSecond;
.
0 голосов
/ 25 сентября 2019

Потратив некоторое время на поиск лучшего решения, я придерживался следующего:

package test;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.AwsRegionProvider;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import test.TestDto;
import test.CustomQueueMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
public class TestQueue {

  private static final String QUEUE_NAME = "TestQueue";
  private static final Logger log = LoggerFactory.getLogger(TestQueue.class);

  public TestQueue(AWSCredentialsProvider credentialsProvider, AwsRegionProvider regionProvider) {
    AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withRegion(regionProvider.getRegion())
        .build();

    // custom QueueMessageHandler to initialize only this queue
    CustomQueueMessageHandler queueMessageHandler = new CustomQueueMessageHandler();
    queueMessageHandler.init(this);
    queueMessageHandler.afterPropertiesSet();

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(client);
    factory.setQueueMessageHandler(queueMessageHandler);

    SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    try {
      simpleMessageListenerContainer.afterPropertiesSet();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    simpleMessageListenerContainer.start();
  }

  @SqsListener(value = QUEUE_NAME, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
    log.info("Received SQS Message: \nSubject: {} \n{}", subject, dto);
  }
}

и пользовательского QueueMessageHandler:

package test;

import java.util.Collections;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;

public class CustomQueueMessageHandler extends QueueMessageHandler {

  public void init(Object handler) {
    detectHandlerMethods(handler);
  }
}

Единственная цель CustomQueueMessageHandler должен передать один объект, где он должен сканировать аннотации SQS.Поскольку я не запускаю его с Spring Context, он не будет искать в каждом бине аннотацию @SqsListener.Но вся инициализация скрыта за защищенными методами.Вот почему мне нужно было перезаписать класс, чтобы получить доступ к этим методам init.

Я не думаю, что это очень элегантное решение, создающее все клиентские вещи AWS вручную и вызывающее bean-методы init.Но это было единственное решение, которое я смог найти и которое по-прежнему предоставляет доступ ко всем функциям библиотеки AWS SQS, таким как преобразование входящих сообщений и уведомлений, политика удаления, опрос очереди, включая обработку сбоев и т. Д.

...