AWS SQS Listener - автоматический выключатель - PullRequest
0 голосов
/ 01 мая 2018

Я создал SQS Listener, который опрашивает сообщения с SQS и отправляет сообщения в службу REST. Если служба REST не работает, я не хочу удалять сообщения из очереди, пока она не будет готова. Я использую Spring Cloud для чтения сообщений с SQS. Есть ли примеры того, как этого можно достичь?

Я настроил Dead Letter Queue для этой очереди. Я не хочу, чтобы все сообщения заканчивались в DLQ, если бэкэнд-сервис не работает.

@Configuration
public class SqsListenerConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "aws.configuration")
    public ClientConfiguration clientConfiguration() {
        return new ClientConfiguration();
    }


    @Bean
    @Primary
    public AWSCredentialsProvider awsCredentialsProvider() {

        ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
        try {
            credentialsProvider.getCredentials();
            System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
            System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());

        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                            "Please make sure that your credentials file is at the correct " +
                            "location (~/.aws/credentials), and is in valid format.",
                    e);
        }
        return credentialsProvider;
    }


    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().
                withCredentials(awsCredentialsProvider()).
                withClientConfiguration(clientConfiguration()).
                build();
    }


    @Bean
    @ConfigurationProperties(prefix = "aws.queue")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
        simpleMessageListenerContainer.setMaxNumberOfMessages(10);
        simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
        return simpleMessageListenerContainer;
    }


    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
        QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
        return queueMessageHandler;
    }


    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("oaoQueueExecutor");
        executor.initialize();
        return executor;
    }


    @Bean
    public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }


}

Слушатель

    @SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {

        System.out.println(" Data = " + serviceData + " MessageId = " + messageId);

        repository.execute(serviceData);
}
...