Понимание канала очереди по умолчанию / поведения емкости - PullRequest
0 голосов
/ 22 мая 2019

У меня странная проблема с приоритетом весенней интеграции (или, по крайней мере, там, где я думаю, что все идет не так). У меня есть следующий поток:

IntegrationFlows
                .from(fileReadingMessageSource,
                        c -> c.poller(Pollers.fixedDelay(period)
                                             .taskExecutor(Executors.newFixedThreadPool(poolSize))
                                             .maxMessagesPerPoll(maxMessagesPerPoll)))
                .channel("alphabetically")
                .bridge(s -> s.poller(Pollers.fixedDelay(100)))
                .channel(ApplicationConfiguration.INBOUND_CHANNEL)
                .get();

И приоритетный канал с емкостью 1'000:

@Bean
    public PriorityChannel alphabetically(@Value("${inbound.sort.queue-capacity}") int capacity) {
        return new PriorityChannel(capacity, Comparator.comparing(left -> ((File) left.getPayload()).getName()));
    }

Я использую этот поток для чтения около 20'000 файлов из входного каталога. Все работает нормально, но после примерно 2000 файлов поток перестает работать, и он не собирает новые файлы.

Я думал, что стандартное поведение канала очереди состоит в том, что, когда он достигает емкости, он просто будет ждать освобождения емкости и примет следующие файлы, которые будут поставлены в очередь? Но я могу ошибаться ... Если это не так, и есть некоторый тайм-аут для файлов, которые будут выбраны опрашивателем и не будут иметь достаточно места в приоритетном канале, что бы вы предложили обойти? это?

1 Ответ

1 голос
/ 28 мая 2019

Добавьте запись в журнал, чтобы увидеть, что происходит;у меня это нормально работает ...

@SpringBootApplication
public class So56259801Application {

    public static void main(String[] args) {
        SpringApplication.run(So56259801Application.class, args);
    }

    private int i;

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(() -> "foo" + i++, e -> e.poller(Pollers.fixedDelay(5_000)
                    .taskExecutor(Executors.newFixedThreadPool(1))))
                .log()
                .channel(MessageChannels.queue(3))
                .bridge(b -> b.poller(Pollers.fixedDelay(10_000)))
                .log()
                .get();
    }

}

и

2019-05-28 13:43:59.719  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo0, headers={id=d87cba1d-dc6b-fdf4-56ed-61f08048851b, timestamp=1559065439718}]
2019-05-28 13:43:59.719  INFO 75315 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo0, headers={id=d87cba1d-dc6b-fdf4-56ed-61f08048851b, timestamp=1559065439718}]
2019-05-28 13:43:59.724  INFO 75315 --- [           main] com.example.So56259801Application        : Started So56259801Application in 0.832 seconds (JVM running for 1.242)
2019-05-28 13:44:04.719  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo1, headers={id=8b7676e6-9bac-cdf0-4f4c-38513267b666, timestamp=1559065444719}]
2019-05-28 13:44:09.721  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo2, headers={id=3b5346f8-d007-dd33-bee3-28eed4cfbd00, timestamp=1559065449721}]
2019-05-28 13:44:10.727  INFO 75315 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo1, headers={id=8b7676e6-9bac-cdf0-4f4c-38513267b666, timestamp=1559065444719}]
2019-05-28 13:44:10.727  INFO 75315 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo2, headers={id=3b5346f8-d007-dd33-bee3-28eed4cfbd00, timestamp=1559065449721}]
2019-05-28 13:44:14.723  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo3, headers={id=84df8665-1aa9-df90-2037-4dd1781b1bf3, timestamp=1559065454723}]
2019-05-28 13:44:19.727  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo4, headers={id=4e81897b-a19c-4789-529a-46266762ccc6, timestamp=1559065459727}]
2019-05-28 13:44:21.733  INFO 75315 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo3, headers={id=84df8665-1aa9-df90-2037-4dd1781b1bf3, timestamp=1559065454723}]
2019-05-28 13:44:21.733  INFO 75315 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo4, headers={id=4e81897b-a19c-4789-529a-46266762ccc6, timestamp=1559065459727}]
2019-05-28 13:44:24.730  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo5, headers={id=3fb96f4a-7d25-f94a-23d2-d4a121932554, timestamp=1559065464730}]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...