как повторить весь поток опроса в Spring-Integration - PullRequest
2 голосов
/ 17 марта 2020

Я потянулся за тем, как реализовать схему повторов (повторение потока ВЕСЬ) для потока поллинга Spring-Integration. Пожалуйста, найдите ниже мой (ошибочный) исходный код (не работает).

Что я делаю не так?
(если я ставлю точку останова на строку, выдающую исключение, она срабатывает только один раз)

большое спасибо заранее за ваше время и ваш опыт.

С наилучшими пожеланиями

nkjp

PS: может быть, вы попытаетесь расширить AbstractHandleMessageAdvice с помощью RetryTemplate?

return IntegrationFLows.from(SOME_QUEUE_CHANNEL)  
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(5000)  
    .advice(RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1,2,10).build())))  
.transform(p -> {  
  if (true) {
    throw new RuntimeException("KABOOM");
  }
  return p;
})
.channel(new NullChannel())
.get();

1 Ответ

1 голос
/ 17 марта 2020

Если вы добавите poller.advice(), то Advice применяется ко всему потоку, начиная с метода poll(). Поскольку вы уже опрашивали сообщение из этой очереди, при следующей попытке от него нечего опрашивать. Это своего рода анти-паттерн - использовать повторные попытки для нетранзакционных очередей: вы не откатываете транзакции, поэтому ваши данные не возвращаются в хранилище, чтобы быть доступными для следующих poll().

Нет на данный момент можно повторить весь подпоток из некоторой точки, но вы определенно можете использовать RequestHandlerRetryAdvice на указанной c ошибочной конечной точке, как ваша transform() с KABOOM исключением:

.transform(p -> {
                        if (true) {
                            throw new RuntimeException("KABOOM");
                        }
                        return p;
                    }, e -> e.advice(new RequestHandlerRetryAdvice()))

См. В setRetryTemplate(RetryTemplate retryTemplate) дополнительные параметры повтора вместо 3 попыток по умолчанию.

Чтобы создать подпоток, нам нужно рассмотреть возможность реализации HandleMessageAdvice. Примерно так:

.transform(p -> p, e -> e.poller(Pollers.fixedDelay(500000))
                            .advice(new HandleMessageAdvice() {

                                RetryOperationsInterceptor delegate =
                                        RetryInterceptorBuilder.stateless()
                                                .maxAttempts(5)
                                                .backOffOptions(1, 2, 10)
                                                .build();

                                @Override
                                public Object invoke(MethodInvocation invocation) throws Throwable {
                                    return delegate.invoke(invocation);
                                }
                            }))

Но опять же: это не poller совет. Это конечная точка на его MessageHandler.handleMessage().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...