Если вы добавите poller.advice()
, то Advice
применяется ко всему потоку, начиная с метода poll()
. Поскольку вы уже опрашивали сообщение из этой очереди, при следующей попытке от него нечего опрашивать. Это своего рода анти-паттерн - использовать повторные попытки для нетранзакционных очередей: вы не откатываете транзакции, поэтому ваши данные не возвращаются в хранилище, чтобы быть доступными для следующих poll()
.
Нет на данный момент можно повторить весь подпоток из некоторой точки, но вы определенно можете использовать RequestHandlerRetryAdvice
на указанной c ошибочной конечной точке, как ваша transform()
с KABOOM
исключением:
.transform(p -> {
if (true) {
throw new RuntimeException("KABOOM");
}
return p;
}, e -> e.advice(new RequestHandlerRetryAdvice()))
См. В setRetryTemplate(RetryTemplate retryTemplate)
дополнительные параметры повтора вместо 3 попыток по умолчанию.
Чтобы создать подпоток, нам нужно рассмотреть возможность реализации HandleMessageAdvice
. Примерно так:
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(500000))
.advice(new HandleMessageAdvice() {
RetryOperationsInterceptor delegate =
RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1, 2, 10)
.build();
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return delegate.invoke(invocation);
}
}))
Но опять же: это не poller
совет. Это конечная точка на его MessageHandler.handleMessage()
.