Почему Spring Integration Channel не может правильно сортировать мои сообщения? - PullRequest
0 голосов
/ 10 октября 2019

У меня есть такая простая конфигурация конвейера. Просто пытаюсь возиться с Spring Integration. Однако вывод странный.

Это мой код:

@Slf4j
@Configuration
@RequiredArgsConstructor
public class FileToConsoleIntegration {

    @Bean public IntegrationFlow fileToConsoleIntegrationFlow() {
        return IntegrationFlows.from(sameSourceDirectory(), spec -> spec.poller(this::getPollerSpec))
                .log(LoggingHandler.Level.WARN, "before.sort", m -> m.getHeaders().get("file_name"))
                .channel(MessageChannels.queue(5))
                .log(LoggingHandler.Level.INFO, "after.sort", m -> m.getHeaders().get("file_name"))
                .channel(alphabeticallyReversed())
                .log(LoggingHandler.Level.ERROR, "after.sort", m -> m.getHeaders().get("file_name"))
                .handle(logInfoMessageHandler())
                .get();
    }

    private PollerSpec getPollerSpec(PollerFactory p) {
        return p
                .fixedRate(1000)
                .maxMessagesPerPoll(10);
    }

    @Bean public MessageSource<File> sameSourceDirectory() {
        FileReadingMessageSource messageSource = new FileReadingMessageSource();
        messageSource.setDirectory(new File("input_dir"));
        return messageSource;
    }

    @Bean public MessageHandler logInfoMessageHandler() {
        return message ->
                log.info("Handling message with headers: {} and payload: {}",
                        message.getHeaders(), message.getPayload());
    }

    @Bean
    public PriorityChannel alphabeticallyReversed() {
        return MessageChannels.priority()
                .capacity(5)
                .comparator(Comparator.comparing(getFilename(), Comparator.reverseOrder()))
                .get();
    }

    private Function<Message<?>, String> getFilename() {
        return a -> (String) a.getHeaders().get("file_name");
    }

}

Это мой ввод:

/input_dir
    01_zhasdfha.txt
    02_usfhahjf.txt
    05_bsdfasdf.txt
    06_asdfasdf.txt

Это мой вывод:

2019-10-10 17:24:48.604  WARN 46024 --- [ask-scheduler-1] before.sort                              : 01_zhasdfha.txt
2019-10-10 17:24:48.605  INFO 46024 --- [ask-scheduler-1] after.sort                               : 01_zhasdfha.txt
2019-10-10 17:24:48.605 ERROR 46024 --- [ask-scheduler-2] after.sort                               : 01_zhasdfha.txt
2019-10-10 17:24:48.605  WARN 46024 --- [ask-scheduler-1] before.sort                              : 02_usfhahjf.txt
2019-10-10 17:24:48.605  INFO 46024 --- [ask-scheduler-1] after.sort                               : 02_usfhahjf.txt
2019-10-10 17:24:48.606  WARN 46024 --- [ask-scheduler-1] before.sort                              : 05_bsdfasdf.txt
2019-10-10 17:24:48.606 ERROR 46024 --- [ask-scheduler-2] after.sort                               : 02_usfhahjf.txt
2019-10-10 17:24:48.606  INFO 46024 --- [ask-scheduler-1] after.sort                               : 05_bsdfasdf.txt
2019-10-10 17:24:48.606 ERROR 46024 --- [ask-scheduler-2] after.sort                               : 05_bsdfasdf.txt
2019-10-10 17:24:48.606  WARN 46024 --- [ask-scheduler-1] before.sort                              : 06_asdfasdf.txt
2019-10-10 17:24:48.606  INFO 46024 --- [ask-scheduler-1] after.sort                               : 06_asdfasdf.txt
2019-10-10 17:24:48.606 ERROR 46024 --- [ask-scheduler-2] after.sort                               : 06_asdfasdf.txt
2019-10-10 17:24:48.605  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/01_zhasdfha.txt, id=5fcfb1cc-7187-86a8-dcf4-23e9b31b2376, file_name=01_zhasdfha.txt, file_relativePath=01_zhasdfha.txt, timestamp=1570728288603} and payload: input_dir/01_zhasdfha.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/06_asdfasdf.txt, id=cf1a978b-c577-bb14-cf45-cf5b7ce8f2a7, file_name=06_asdfasdf.txt, file_relativePath=06_asdfasdf.txt, timestamp=1570728288606} and payload: input_dir/06_asdfasdf.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/05_bsdfasdf.txt, id=c81dbcdc-6dd0-b549-4923-fe567ff6ca23, file_name=05_bsdfasdf.txt, file_relativePath=05_bsdfasdf.txt, timestamp=1570728288606} and payload: input_dir/05_bsdfasdf.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/02_usfhahjf.txt, id=3371143b-a097-a9ee-e303-d4d359875188, file_name=02_usfhahjf.txt, file_relativePath=02_usfhahjf.txt, timestamp=1570728288605} and payload: input_dir/02_usfhahjf.txt
2019-10-10 17:24:48.609  INFO 46024 --- [           main] c.s.i.siexample.SiExampleApplication     : Started SiExampleApplication in 0.743 seconds (JVM running for 1.232)

Желаемый выход:

2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/06_asdfasdf.txt, id=cf1a978b-c577-bb14-cf45-cf5b7ce8f2a7, file_name=06_asdfasdf.txt, file_relativePath=06_asdfasdf.txt, timestamp=1570728288606} and payload: input_dir/06_asdfasdf.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/05_bsdfasdf.txt, id=c81dbcdc-6dd0-b549-4923-fe567ff6ca23, file_name=05_bsdfasdf.txt, file_relativePath=05_bsdfasdf.txt, timestamp=1570728288606} and payload: input_dir/05_bsdfasdf.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/02_usfhahjf.txt, id=3371143b-a097-a9ee-e303-d4d359875188, file_name=02_usfhahjf.txt, file_relativePath=02_usfhahjf.txt, timestamp=1570728288605} and payload: input_dir/02_usfhahjf.txt
2019-10-10 17:24:48.605  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/01_zhasdfha.txt, id=5fcfb1cc-7187-86a8-dcf4-23e9b31b2376, file_name=01_zhasdfha.txt, file_relativePath=01_zhasdfha.txt, timestamp=1570728288603} and payload: input_dir/01_zhasdfha.txt

Разница между желаемым выходом и токовым выходом заключается в том, что порядок запутан. Как я могу это исправить?

1 Ответ

1 голос
/ 14 октября 2019

То, что вы установили размеры очереди равными 5, не означает, что инфраструктура будет ждать все 5, прежде чем нижестоящий опросщик их потянет. Если есть сообщение, ожидающее сообщения (по умолчанию каждый запрос опрашивается до 1 секунды), оно будет получено немедленно. Вы можете попробовать установить тайм-аут приема для опроса на 0, но все равно будет (небольшое) условие гонки, когда сообщения могут быть получены раньше, чем вы ожидали.

Было бы лучше использовать агрегатор, за которым следует преобразователь, чтобыотсортируйте List<> (или пользовательский выходной процессор на агрегаторе), а затем разделитель.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...