Я создаю реализацию интеграции Sprint с двумя PollableChannel
s:
- Обычный канал
- Канал ошибок
Сообщения опрашиваются с обычный канал и обработан. Если во время обработки возникает ошибка (например, внешняя служба недоступна), сообщение отправляется в канал ошибок. Из канала ошибок он помещается в очередь на обычный канал, и цикл продолжается до тех пор, пока сообщение не будет успешно обработано.
Идея состоит в том, чтобы редко опрашивать канал ошибок, чтобы дать процессору некоторое время (надеюсь, ) восстановить.
Я смоделировал этот рабочий процесс в следующем тесте:
package com.stackoverflow.questions.sipoller;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_MINUTES;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
@SpringBootTest
class SiPollerApplicationTests {
private final static Logger LOG = LoggerFactory.getLogger(SiPollerApplicationTests.class);
private final static String QUEUE_CHANNEL_REGULAR = "queueChannelRegular";
private final static String QUEUE_CHANNEL_ERROR = "queueChannelError";
private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds
private final static AtomicInteger NUMBER_OF_ATTEMPTS = new AtomicInteger();
private final static AtomicInteger NUMBER_OF_SUCCESSES = new AtomicInteger();
private final static List<Instant> ATTEMPT_INSTANTS = Collections.synchronizedList(new ArrayList<>());
@Autowired
@Qualifier(QUEUE_CHANNEL_REGULAR)
private PollableChannel channelRegular;
@Test
void testTimingOfMessageProcessing() {
channelRegular.send(MessageBuilder.withPayload("Test message").build());
await()
.atMost(FIVE_MINUTES)
.with()
.pollInterval(ONE_HUNDRED_MILLISECONDS)
.until(
() -> {
if (NUMBER_OF_SUCCESSES.intValue() == 1) {
reportGaps();
return true;
}
return false;
}
);
}
private void reportGaps() {
List<Long> gaps = IntStream
.range(1, ATTEMPT_INSTANTS.size())
.mapToObj(
i -> Duration
.between(
ATTEMPT_INSTANTS.get(i - 1),
ATTEMPT_INSTANTS.get(i)
)
.toMillis()
)
.collect(Collectors.toList());
LOG.info("Gaps between attempts (in ms): {}", gaps);
}
@Configuration
@EnableIntegration
@Import(SiPollerApplicationTestEndpoint.class)
static class SiPollerApplicationTestConfig {
@Bean(name = QUEUE_CHANNEL_REGULAR)
public PollableChannel queueChannelRegular() {
return MessageChannels.queue(QUEUE_CHANNEL_REGULAR).get();
}
@Bean(name = QUEUE_CHANNEL_ERROR)
public PollableChannel queueChannelError() {
return MessageChannels.queue(QUEUE_CHANNEL_ERROR).get();
}
@Router(
inputChannel = QUEUE_CHANNEL_ERROR,
poller = @Poller(fixedRate = POLLER_PERIOD_ERROR)
)
public String retryProcessing() {
return QUEUE_CHANNEL_REGULAR;
}
}
@MessageEndpoint
static class SiPollerApplicationTestEndpoint {
@Autowired
@Qualifier(QUEUE_CHANNEL_ERROR)
private PollableChannel channelError;
@ServiceActivator(
inputChannel = QUEUE_CHANNEL_REGULAR,
poller = @Poller(fixedRate = POLLER_PERIOD_REGULAR)
)
public void handleMessage(Message<String> message) {
// Count and time attempts
int numberOfAttempts = NUMBER_OF_ATTEMPTS.getAndIncrement();
ATTEMPT_INSTANTS.add(Instant.now());
// First few times - refuse to process message and bounce it into
// error channel
if (numberOfAttempts < 5) {
channelError.send(message);
return;
}
// After that - process message
NUMBER_OF_SUCCESSES.getAndIncrement();
}
}
}
Зависимости pom.xml
:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>
Обратите внимание на конфигурацию для Poller
s:
private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds
Предполагается, что обычный канал опрашивается один раз в полсекунды, а канал ошибок - один раз в три секунды.
Тест имитирует сбои во время обработки сообщения: первые пять попыток обработки сообщения отклоняются. Кроме того, тест записывает Instant
каждой попытки обработки. В конце концов, на моей машине тест выдает:
Gaps between attempts (in ms): [1, 0, 0, 0, 0]
Другими словами, сообщение повторяется почти сразу после каждого сбоя.
Мне кажется, что я принципиально неправильно понять, как Poller
работает в Spring Integration. Итак, мои вопросы:
- Почему существует такой диссонанс между конфигурацией опроса и фактической частотой опроса.
- Предоставляет ли Spring Integration способ реализации шаблона, который я описал ?