Шаблон Spring Retry, блокирующий мои очереди ответа - PullRequest
1 голос
/ 01 апреля 2020

ЧТО IAM ПЫТАЕТСЯ ДОСТИГНУТЬ У меня есть REST Call из пользовательского интерфейса, который призывает добавить пользователя. Таким образом, пользователь должен будет создать асинхронную очередь c (это ограничение), но затем подождать очередь ответов в течение настроенного времени и обработать ее, прежде чем результат будет отправлен обратно в пользовательский интерфейс. Если очередь возвращается с пустым ссылочным номером, то я должен удалить запись пользователя и выдать исключение, говоря, что пользователь недействителен. Если ответ возвращается с действительной ссылкой (или если тайм-аут случается), то я предполагаю, что он действителен и возвращает успех.

У меня есть приложение, в котором я отправляю сообщение очереди, чтобы получить referenceNumber для моего пользователя Объект. А затем дождитесь ответа очереди, прежде чем отвечать обратно на вызов REST. Но мне нужно подождать, пока настроенное время ответа очереди вернется.

UserManagerImpl

// REST CALL to persist
public User Persist(User user) {
...
...
 // Building the message for sending to QUEUE
 UserEnvelopeV1_0 userEnvelope =buildUserEnvelope(user);
// This is the place i send the queue message
userQueueClient.send(userEnvelope);
// Update Request time
updateRequestDetails(user.getUserId);
// This is the call i am going retry
boolean userValid = userRetryTemplate.doUserReferenceRetry(userId);
if (!userValid ) {
                  //remove User Object
                  throw Exception
                }
...
}

// update the request time for reference Number
private void updateRequestDetails(String userId) {
 User user = userRepository.findById(userId);
        if (user != null) {
            user.setRefRequestDateItem(DateHelper.createXMLGregorianCalendar());
            userRepository.saveAndFlush(user);
        }

public void updateReference(String userId, String referenceNumber) {

        User user = userRepository.findById(userId);
        if (user != null) {
            user.setReference(referenceNumber);
            user.setResponseDate(DateHelper.createXMLGregorianCalendar());
            userRepository.saveAndFlush(user);
        }
    }

UserQueueClient:

@Component
public class UserQueueClient {



    @JmsListener(id = "#{T(java.util.UUID).nameUUIDFromBytes('${in.res}",
            destination = "${in.res}", containerFactory = "containerFactory")
    public void receive(Message message, UserEnvelopeV1_0 envelope) throws{


        try {
            String userId = envelope.getHeader().getMessageIdentification().getUserId();
 ApplicationInformationStructure applicationInformation = envelope.getBody().getApplicationInformation();

if(CollectionUtils.isNotEmpty(applicationInformation.getApplicationInformationResult())) {
          String referenceNumber = applicationInformation.getApplicationInformationResult().getRefNumber();      

                userManager.updateReference(userId, referenceNumber);
            }

        } catch (Exception e) {
            //
        }
    }

    @Transactional(propagation = Propagation.MANDATORY)
    public void send(UserEnvelopeV1_0 sarsSoapEnvelope) throws JMSException {


        envelope.setHeader();

        Message message = sendToQueue(envelope, requestQueue, responseQueue,
                userId);

        applicationEventPublisher.publishEvent(new MessageLogEvent("USER_GET_REF_NUMBER", message, MessageType.XML,
                requestQueue, MessageDirection.SEND, true, false, new Date(), userId));

    }
}

UserRetryTemplate



@Component
public class UserRetryTemplate {


    @Value("${retry.max.attempts:5}")
    private int maxAttempts;

    @Value("${response.waiting.time.in.seconds:60}")
    private long maxDelay;

    @Autowired
    private UserRepository userRepository;

    private static final long INITIAL_INTERVAL = 2000L;

    public RetryTemplate retryTemplate() {

        // Max timeout in milliseconds
        long maxTimeout = maxDelay*1000;

        //double multiplier = (maxTimeout - INITIAL_INTERVAL)/((maxAttempts-2)*6000);

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(maxAttempts);


        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(maxTimeout/(maxAttempts-1));

        RetryTemplate template = new RetryTemplate();
        template.setRetryPolicy(retryPolicy);
        template.setBackOffPolicy(backOffPolicy);
        return template;
    }

    public boolean doUserReferenceRetry(String userId) {
        boolean isUserReferenceValid = true;
        try {
            boolean isValidUser = retryTemplate().execute(context -> {
                logger.info("Attempted {} times", context.getRetryCount());
                User user = userRepository.findById(userId);
                logger.info("User Retry :" + user);

                if (user.getResponseDateItem() == null || user.getReferenceNumber == null) {
                    logger.info("response not yet received");
                    throw new IllegalStateException("User Response not yet received");
                }
                if (user.getReferenceNumber != null)) {
                    return true;
                }
                throw new IllegalStateException("Response not yet received");
            });
            return isUserReferenceValid ;
        } catch (IllegalArgumentException e) {

        }
        return true;
    }

}

Итак, я реализовал logi c, где я отправлю сообщение очереди и выполню Spring-повторение (для настроенного времени), чтобы проверить база данных, если referenceNumber обновляется в БД. Также, когда ответ очереди вернется, я обновлю БД с помощью referenceNumber.

Но, когда я реализовал вышеуказанную логику c, повторная попытка пружины продолжает повторяться до настроенного времени, но мое приложение Spring не обрабатывает никаких очередей ответа. Есть ли способ, как приложение Spring может запускать оба процесса параллельно.

Проблема заключается в том, что, если я удалю механизм повторной попытки пружины, очередь ответов обрабатывает мой ответ и обновляет запись пользователя со ссылочным номером.

Но когда я добавил логи повторных попыток c, очередь ответов больше не обрабатывает мою очередь.

1 Ответ

0 голосов
/ 01 апреля 2020

Я обнаружил, что нижняя строка сбивает с толку.

", где я отправлю сообщение очереди и выполню Spring-повторение (для настроенного времени), чтобы проверить базу данных, обновляется ли referenceNumber в БД. Кроме того, когда ответ из очереди возвращается, я обновлю базу данных с помощью referenceNumber. "

В одной строке вы говорите, что ожидаете обновления ссылочного номера, а в другой строке вы говорите, что вы обновляют базу данных. Кто здесь продюсер? есть две разные темы? В этом случае вы - производитель и потребитель.

Если вы хотите заблокировать текущий поток на настроенное время. Можете ли вы рассмотреть метод блокировки очереди с опросом (длительное время ожидания, единица времени)

poll(long timeout, TimeUnit unit) – retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout

Пожалуйста, отредактируйте вопрос с достаточными подробностями.

...