Потребовалось немного повозиться и попробовать разные вещи из других ответов SO.
Вот мой код, и я постараюсь объяснить как можно лучше.Я включаю все, что я использую для своего потребителя SQS.
Мой класс конфигурации приведен ниже.Единственная неочевидная вещь, которую следует отметить ниже, - это объекты преобразователя и преобразователя, созданные в методе queueMessageHandlerFactory.Класс MappingJackson2MessageConverter (в случае, если это не очевидно из слишком очевидного имени класса) обрабатывает десериализацию полезной нагрузки из SQS.
Также важно, чтобы для строгого соответствия типа содержимого было установлено значение false.
Кроме того, MappingJackson2MessageConverter позволяет вам установить свой собственный объект Jackson ObjectMapper, однако, если вы сделаете это, вам потребуется настроить его следующим образом:
objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Возможно, вы не захотите этого делать,так что вы можете оставить его пустым, и он создаст свой собственный ObjectMapper.
Я думаю, остальная часть кода довольно понятна ...?Дайте мне знать, если нет.
Одно различие между нашими вариантами использования, похоже, вы отображаете свой собственный объект (SqsEventDTO), и я предполагаю, что это работает?В этом случае я не думаю, что вам понадобится MappingJackson2MessageConverter, но я могу ошибаться.
@Configuration
public class AppConfig {
@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
return queueMessageHandlerFactory.createQueueMessageHandler();
}
@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setAmazonSqs(sqsClient);
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setSerializedPayloadClass(String.class);
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
// Uses the MappingJackson2MessageConverter object to resolve/map
// the payload against the Message/S3EventNotification argument.
PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);
// Extract the acknowledgment data from the payload's headers,
// which then gets deserialized into the Acknowledgment object.
AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");
// I don't remember the specifics of WHY, however there is
// something important about the order of the argument resolvers
// in the list
factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));
return factory;
}
@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
@Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
smlc.setWaitTimeOut(20);
smlc.setAmazonSqs(amazonSQSAsync);
smlc.setMessageHandler(queueMessageHandler);
smlc.setBeanName("ConsumerBean");
smlc.setMaxNumberOfMessages(sqsMaxMessages);
smlc.setTaskExecutor(threadPoolExecutor);
return smlc;
}
@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(threadTimeoutSeconds);
executor.setThreadNamePrefix(threadName);
executor.initialize();
return executor;
}
}
Мой класс обслуживания SQS ниже.
@Service
public class RawConsumer {
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
// Handle event here
}
Надеюсь, это поможет, дайте мне знать, если у вас есть какие-либо проблемы.