Spring Cloud AWS Проблема с настройкой ручного подтверждения сообщения SQS - PullRequest
0 голосов
/ 17 октября 2018

Я пытаюсь реализовать логику с ручным удалением сообщения AWS SQS, используя spring-cloud-aws-messaging.Эта функция была реализована в рамках этого билета из примера в тестах

@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment) {

    LOGGER.info("Received message {}", message.getFoo());
    try {
        acknowledgment.acknowledge().get();
    } catch (InterruptedException e) {
        LOGGER.error("Opps", e);
    } catch (ExecutionException e) {
        LOGGER.error("Opps", e);
    }
}

Но столкнулся с неожиданным исключением

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance oforg.springframework.cloud.aws.messaging.listener.Acknowledgement (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Решение с SqsMessageDeletionPolicy.ON_SUCCESS работает, но я хочу избежать исключения.

Что я пропустил в конфигурации?

1 Ответ

0 голосов
/ 24 декабря 2018

Потребовалось немного повозиться и попробовать разные вещи из других ответов SO.

Вот мой код, и я постараюсь объяснить как можно лучше.Я включаю все, что я использую для своего потребителя SQS.

Мой класс конфигурации приведен ниже.Единственная неочевидная вещь, которую следует отметить ниже, - это объекты преобразователя и преобразователя, созданные в методе queueMessageHandlerFactory.Класс MappingJackson2MessageConverter (в случае, если это не очевидно из слишком очевидного имени класса) обрабатывает десериализацию полезной нагрузки из SQS.

Также важно, чтобы для строгого соответствия типа содержимого было установлено значение false.

Кроме того, MappingJackson2MessageConverter позволяет вам установить свой собственный объект Jackson ObjectMapper, однако, если вы сделаете это, вам потребуется настроить его следующим образом:

objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

Возможно, вы не захотите этого делать,так что вы можете оставить его пустым, и он создаст свой собственный ObjectMapper.

Я думаю, остальная часть кода довольно понятна ...?Дайте мне знать, если нет.

Одно различие между нашими вариантами использования, похоже, вы отображаете свой собственный объект (SqsEventDTO), и я предполагаю, что это работает?В этом случае я не думаю, что вам понадобится MappingJackson2MessageConverter, но я могу ошибаться.

@Configuration
public class AppConfig {

@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
    return queueMessageHandlerFactory.createQueueMessageHandler();
}

@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {

    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    factory.setAmazonSqs(sqsClient);

    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setSerializedPayloadClass(String.class);

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);

    // Uses the MappingJackson2MessageConverter object to resolve/map 
    // the payload against the Message/S3EventNotification argument.
    PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);

    // Extract the acknowledgment data from the payload's headers, 
    // which then gets deserialized into the Acknowledgment object.  
    AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");

    // I don't remember the specifics of WHY, however there is 
    // something important about the order of the argument resolvers 
    // in the list
    factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));

    return factory;
}

@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
    @Autowired ThreadPoolTaskExecutor threadPoolExecutor) {

    SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
    smlc.setWaitTimeOut(20);
    smlc.setAmazonSqs(amazonSQSAsync);
    smlc.setMessageHandler(queueMessageHandler);
    smlc.setBeanName("ConsumerBean");
    smlc.setMaxNumberOfMessages(sqsMaxMessages);
    smlc.setTaskExecutor(threadPoolExecutor);

    return smlc;
}

@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(corePoolSize);
    executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setKeepAliveSeconds(threadTimeoutSeconds);
    executor.setThreadNamePrefix(threadName);
    executor.initialize();

    return executor;
}
}

Мой класс обслуживания SQS ниже.

@Service
public class RawConsumer {

@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
    // Handle event here
}

Надеюсь, это поможет, дайте мне знать, если у вас есть какие-либо проблемы.

...