Как работает @ Poller-s в Spring Integration? - PullRequest
0 голосов
/ 27 апреля 2020

Я создаю реализацию интеграции Sprint с двумя PollableChannel s:

  1. Обычный канал
  2. Канал ошибок

Сообщения опрашиваются с обычный канал и обработан. Если во время обработки возникает ошибка (например, внешняя служба недоступна), сообщение отправляется в канал ошибок. Из канала ошибок он помещается в очередь на обычный канал, и цикл продолжается до тех пор, пока сообщение не будет успешно обработано.

Идея состоит в том, чтобы редко опрашивать канал ошибок, чтобы дать процессору некоторое время (надеюсь, ) восстановить.

Я смоделировал этот рабочий процесс в следующем тесте:

package com.stackoverflow.questions.sipoller;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_MINUTES;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;

@SpringBootTest
class SiPollerApplicationTests {

    private final static Logger LOG = LoggerFactory.getLogger(SiPollerApplicationTests.class);

    private final static String QUEUE_CHANNEL_REGULAR = "queueChannelRegular";
    private final static String QUEUE_CHANNEL_ERROR = "queueChannelError";

    private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
    private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds

    private final static AtomicInteger NUMBER_OF_ATTEMPTS = new AtomicInteger();
    private final static AtomicInteger NUMBER_OF_SUCCESSES = new AtomicInteger();
    private final static List<Instant> ATTEMPT_INSTANTS = Collections.synchronizedList(new ArrayList<>());

    @Autowired
    @Qualifier(QUEUE_CHANNEL_REGULAR)
    private PollableChannel channelRegular;

    @Test
    void testTimingOfMessageProcessing() {
        channelRegular.send(MessageBuilder.withPayload("Test message").build());

        await()
                .atMost(FIVE_MINUTES)
                .with()
                .pollInterval(ONE_HUNDRED_MILLISECONDS)
                .until(
                        () -> {
                            if (NUMBER_OF_SUCCESSES.intValue() == 1) {
                                reportGaps();
                                return true;
                            }
                            return false;
                        }
                );
    }

    private void reportGaps() {
        List<Long> gaps = IntStream
                .range(1, ATTEMPT_INSTANTS.size())
                .mapToObj(
                        i -> Duration
                                .between(
                                        ATTEMPT_INSTANTS.get(i - 1),
                                        ATTEMPT_INSTANTS.get(i)
                                )
                                .toMillis()
                )
                .collect(Collectors.toList());
        LOG.info("Gaps between attempts (in ms): {}", gaps);
    }

    @Configuration
    @EnableIntegration
    @Import(SiPollerApplicationTestEndpoint.class)
    static class SiPollerApplicationTestConfig {

        @Bean(name = QUEUE_CHANNEL_REGULAR)
        public PollableChannel queueChannelRegular() {
            return MessageChannels.queue(QUEUE_CHANNEL_REGULAR).get();
        }

        @Bean(name = QUEUE_CHANNEL_ERROR)
        public PollableChannel queueChannelError() {
            return MessageChannels.queue(QUEUE_CHANNEL_ERROR).get();
        }

        @Router(
                inputChannel = QUEUE_CHANNEL_ERROR,
                poller = @Poller(fixedRate = POLLER_PERIOD_ERROR)
        )
        public String retryProcessing() {
            return QUEUE_CHANNEL_REGULAR;
        }
    }

    @MessageEndpoint
    static class SiPollerApplicationTestEndpoint {

        @Autowired
        @Qualifier(QUEUE_CHANNEL_ERROR)
        private PollableChannel channelError;

        @ServiceActivator(
                inputChannel = QUEUE_CHANNEL_REGULAR,
                poller = @Poller(fixedRate = POLLER_PERIOD_REGULAR)
        )
        public void handleMessage(Message<String> message) {
            // Count and time attempts
            int numberOfAttempts = NUMBER_OF_ATTEMPTS.getAndIncrement();
            ATTEMPT_INSTANTS.add(Instant.now());

            // First few times - refuse to process message and bounce it into
            // error channel
            if (numberOfAttempts < 5) {
                channelError.send(message);
                return;
            }

            // After that - process message
            NUMBER_OF_SUCCESSES.getAndIncrement();
        }
    }

}

Зависимости pom.xml:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.awaitility</groupId>
            <artifactId>awaitility</artifactId>
            <version>4.0.2</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

Обратите внимание на конфигурацию для Poller s:

    private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
    private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds

Предполагается, что обычный канал опрашивается один раз в полсекунды, а канал ошибок - один раз в три секунды.

Тест имитирует сбои во время обработки сообщения: первые пять попыток обработки сообщения отклоняются. Кроме того, тест записывает Instant каждой попытки обработки. В конце концов, на моей машине тест выдает:

Gaps between attempts (in ms): [1, 0, 0, 0, 0]

Другими словами, сообщение повторяется почти сразу после каждого сбоя.

Мне кажется, что я принципиально неправильно понять, как Poller работает в Spring Integration. Итак, мои вопросы:

  1. Почему существует такой диссонанс между конфигурацией опроса и фактической частотой опроса.
  2. Предоставляет ли Spring Integration способ реализации шаблона, который я описал ?

1 Ответ

1 голос
/ 27 апреля 2020

Существует два параметра, которые могут повлиять на это поведение.

QueueChannel по умолчанию очистители опустошают очередь; setMaxMessagesPerPoll(1) чтобы получать только одно сообщение в каждом опросе.

Также по умолчанию время ожидания по умолчанию QueueChannel составляет 1 секунду (1000 мс).

Так что первый опрос может быть раньше, чем вы думаете; установите его на 0 для немедленного выхода, если в очереди нет сообщений.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...