Для поддержки этих требований вам определенно необходимо изменить код упомянутого проекта File Source: https://docs.spring.io/spring-cloud-stream-app-starters/docs/Einstein.BUILD-SNAPSHOT/reference/htmlsingle/#_patching_pre_built_applications
Я бы предложил раскошелиться на проект и опросить его из GitHub как есть, так как вы собираетесьизменить существующий код проекта.Затем следуйте инструкциям в упомянутом документе, как создать целевой артефакт связующего, который будет совместим со средой SCDF.
Теперь о вопросах:
Для опроса подкаталогов для того жешаблон файла, вам нужно настроить RecursiveDirectoryScanner
на Files.inboundAdapter()
:
/**
* Specify a custom scanner.
* @param scanner the scanner.
* @return the spec.
* @see FileReadingMessageSource#setScanner(DirectoryScanner)
*/
public FileInboundChannelAdapterSpec scanner(DirectoryScanner scanner) {
Обратите внимание, что все filters
должны быть настроены на этом DirectoryScanner
.В противном случае будет выдано предупреждение:
// Check that the filter and locker options are _NOT_ set if an external scanner has been set.
// The external scanner is responsible for the filter and locker options in that case.
Assert.state(!(this.scannerExplicitlySet && (this.filter != null || this.locker != null)),
() -> "When using an external scanner the 'filter' and 'locker' options should not be used. " +
"Instead, set these options on the external DirectoryScanner: " + this.scanner);
Чтобы отслеживать файлы, лучше использовать FileSystemPersistentAcceptOnceFileListFilter
, основанный на внешнем хранилище постоянства для реализации ConcurrentMetadataStore
: https://docs.spring.io/spring-integration/reference/html/#metadata-store. Это необходимо использовать вместо этого preventDuplicates()
, потому что FileSystemPersistentAcceptOnceFileListFilter
обеспечивает только один раз логика для нас.
Удаление файла после отправки может не иметь значения, поскольку вы можете просто отправить File
как есть, и он должен быть доступен на другой стороне.
Кроме того, вы можете добавить ChannelInterceptor
в source.output()
и реализовать его postSend()
ввыполнить ((File) message.getPayload()).delete()
, что произойдет, когда сообщение будет успешно отправлено в место назначения подшивки.