Я пытаюсь реализовать следующий сценарий:
- Я получаю набор файлов с общим шаблоном файла, т.е. doc0001_page0001, doc0001_page0002, doc0001_page0003, doc0002_page0001 (где doc0001 будет одним документом, состоящим изиз 3 страниц, которые мне нужно объединить, doc0002 будет иметь только 1 страницу)
- Я хочу объединить их так, чтобы я выпустил группу, только если собраны все файлы для конкретного документа (doc0001после того, как 3 файла были отобраны, doc0002 после 1 файла)
Моя идея состояла в том, чтобы прочитать файлы в алфавитном порядке и подождать 2 секунды после того, как группа была в последний раз изменена, чтобы освободить ее (g.getLastModified()
меньше текущего времени минус 2 секунды)
Я безуспешно пробовал следующее:
return IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
.patternFilter("*.json")
.useWatchService(true)
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE,
FileReadingMessageSource.WatchEventType.MODIFY),
e -> e.poller(Pollers.fixedDelay(100)
.errorChannel("filePollingErrorChannel")))
.enrichHeaders(h -> h.headerExpression("CORRELATION_PATTERN", "headers[" + FileHeaders.FILENAME + "].substring(0,7)")) // docxxxx.length()
.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
.releaseStrategy(g -> g.getLastModified() < System.currentTimeMillis() - 2000)) .channel(MessageChannels.queue("fileReadingResultChannel"))
.get();
Изменение стратегии выпуска на следующее также не сработало:
.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
.releaseStrategy(g -> {
Stream<Message<?>> stream = g.getMessages()
.stream();
Long timestamp = (Long) stream.skip(stream.count() - 1)
.findFirst()
.get()
.getHeaders()
.get(MessageHeaders.TIMESTAMP);
System.out.println("Timestamp: " + timestamp);
return timestamp.longValue() < System.currentTimeMillis() - 2000;
}))
Я неправильно понимаю концепцию стратегии выпуска?
Также возможно ли что-то распечатать из releaseStrблок атеги?Я хотел сравнить метку времени (см. System.out.println("Timestamp: " + timestamp);
)