Стратегия выпуска агрегатора Spring Integration на основе последних изменений - PullRequest
0 голосов
/ 14 апреля 2019

Я пытаюсь реализовать следующий сценарий:

  1. Я получаю набор файлов с общим шаблоном файла, т.е. doc0001_page0001, doc0001_page0002, doc0001_page0003, doc0002_page0001 (где doc0001 будет одним документом, состоящим изиз 3 страниц, которые мне нужно объединить, doc0002 будет иметь только 1 страницу)
  2. Я хочу объединить их так, чтобы я выпустил группу, только если собраны все файлы для конкретного документа (doc0001после того, как 3 файла были отобраны, doc0002 после 1 файла)

Моя идея состояла в том, чтобы прочитать файлы в алфавитном порядке и подождать 2 секунды после того, как группа была в последний раз изменена, чтобы освободить ее (g.getLastModified()меньше текущего времени минус 2 секунды)

Я безуспешно пробовал следующее:

return IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                  .patternFilter("*.json")
                                  .useWatchService(true)
                                  .watchEvents(FileReadingMessageSource.WatchEventType.CREATE,
                                          FileReadingMessageSource.WatchEventType.MODIFY),
        e -> e.poller(Pollers.fixedDelay(100)
                             .errorChannel("filePollingErrorChannel")))
                       .enrichHeaders(h -> h.headerExpression("CORRELATION_PATTERN", "headers[" + FileHeaders.FILENAME + "].substring(0,7)")) // docxxxx.length()
                       .aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                                        .releaseStrategy(g -> g.getLastModified() < System.currentTimeMillis() - 2000))                       .channel(MessageChannels.queue("fileReadingResultChannel"))
                       .get();

Изменение стратегии выпуска на следующее также не сработало:

.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                .releaseStrategy(g -> {
                    Stream<Message<?>> stream = g.getMessages()
                                                 .stream();
                    Long timestamp = (Long) stream.skip(stream.count() - 1)
                                                  .findFirst()
                                                  .get()
                                                  .getHeaders()
                                                  .get(MessageHeaders.TIMESTAMP);
                    System.out.println("Timestamp: " + timestamp);
                    return timestamp.longValue() < System.currentTimeMillis() - 2000;

                }))

Я неправильно понимаю концепцию стратегии выпуска?

Также возможно ли что-то распечатать из releaseStrблок атеги?Я хотел сравнить метку времени (см. System.out.println("Timestamp: " + timestamp);)

Ответы [ 2 ]

1 голос
/ 15 апреля 2019

Правильно, поскольку вы не знаете всей последовательности для группы сообщений, у вас нет другого выбора, кроме как использовать groupTimeout. Обычный releaseStrategy работает только тогда, когда сообщение поступает в агрегатор. Поскольку в момент одного сообщения у вас недостаточно информации для освобождения группы, она будет находиться в хранилище групп навсегда.

Опция groupTimeout была введена в агрегаторе специально для такого рода случаев, когда мы определенно хотели бы выпустить группу без достаточного количества сообщений для обычной группировки.

Вы можете использовать groupTimeoutExpression вместо groupTimeout на основе констант. MessageGroup является корневым объектом контекста оценки для SpEL, поэтому вы сможете получить к нему доступ к упомянутому lastModified.

.sendPartialResultOnExpiry(true) - верный вариант для решения здесь.

Больше информации в документации: https://docs.spring.io/spring-integration/reference/html/#agg-and-group-to

0 голосов
/ 14 апреля 2019

Я нашел решение для этого с другим подходом. Я до сих пор не понимаю, почему вышеперечисленное не работает.

Я также нашел более чистый способ определения корреляционной функции.

IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                  .patternFilter("*.json")
                                  .useWatchService(true)
                                  .watchEvents(FileReadingMessageSource.WatchEventType.CREATE, FileReadingMessageSource.WatchEventType.MODIFY), e -> e
        .poller(Pollers.fixedDelay(100)))
                       .enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
                               .getHeaders()
                               .get(FileHeaders.FILENAME)).substring(0, 17)))
                       .aggregate(a -> a.groupTimeout(2000)
                                        .sendPartialResultOnExpiry(true))
                       .channel(MessageChannels.queue("fileReadingResultChannel"))
                       .get();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...