У меня есть два фильтра regexFilter и lastModified.
return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
.localDirectory(this.getlocalDirectory(config.getId()))
.deleteRemoteFiles(true)
.autoCreateLocalDirectory(true)
.regexFilter(config.getRegexFilter())
.filter(new LastModifiedLsEntryFileListFilter())
.remoteDirectory(config.getInboundDirectory())
, e -> e.poller(Pollers.fixedDelay(60_000)
.errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
})))
При поиске в Google я понимаю, что должен использовать CompositeFileListFilter для регулярных выражений, поэтому измените мой код на
.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
Его компилируется, но во время выполнения выдает ошибку, и канал останавливается, и та же самая ошибка идет для
.filter(ftpPersistantFilter(config.getRegexFilter()))
.
.
.
public CompositeFileListFilter ftpPersistantFilter(String regexFilter) {
CompositeFileListFilter filters = new CompositeFileListFilter();
filters.addFilter(new FtpRegexPatternFileListFilter(regexFilter));
return filters;
}
Я просто хочу фильтровать по имени файла. Существует 2 потока для одной и той же удаленной папки, и оба опрашивают один и тот же cron, но должны выбрать соответствующий файл.
EDIT добавление последнего LastModifiedLsEntryFileListFilter. Работает нормально, но добавляется по запросу.
public class LastModifiedLsEntryFileListFilter implements FileListFilter<LsEntry> {
private final Logger log = LoggerFactory.getLogger(LastModifiedLsEntryFileListFilter.class);
private static final long DEFAULT_AGE = 60;
private volatile long age = DEFAULT_AGE;
private volatile Map<String, Long> sizeMap = new HashMap<String, Long>();
public long getAge() {
return this.age;
}
public void setAge(long age) {
setAge(age, TimeUnit.SECONDS);
}
public void setAge(long age, TimeUnit unit) {
this.age = unit.toSeconds(age);
}
@Override
public List<LsEntry> filterFiles(LsEntry[] files) {
List<LsEntry> list = new ArrayList<LsEntry>();
long now = System.currentTimeMillis() / 1000;
for (LsEntry file : files) {
if (file.getAttrs()
.isDir()) {
continue;
}
String fileName = file.getFilename();
Long currentSize = file.getAttrs().getSize();
Long oldSize = sizeMap.get(fileName);
if(oldSize == null || currentSize.longValue() != oldSize.longValue() ) {
// putting size in map, will verify in next iteration of scheduler
sizeMap.put(fileName, currentSize);
log.info("[{}] old size [{}] increased to [{}]...", file.getFilename(), oldSize, currentSize);
continue;
}
int lastModifiedTime = file.getAttrs()
.getMTime();
if (lastModifiedTime + this.age <= now ) {
list.add(file);
sizeMap.remove(fileName);
} else {
log.info("File [{}] is still being uploaded...", file.getFilename());
}
}
return list;
}
}
PS: когда я тестировал фильтр для регулярных выражений, я удалил LastModifiedLsEntryFileListFilter просто для простоты. Итак, мой последний поток -
return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
.localDirectory(this.getlocalDirectory(config.getId()))
.deleteRemoteFiles(true)
.autoCreateLocalDirectory(true)
.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
//.filter(new LastModifiedLsEntryFileListFilter())
.remoteDirectory(config.getInboundDirectory()),
e -> e.poller(Pollers.fixedDelay(60_000)
.errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
try {
this.destroy(String.valueOf(config.getId()));
configurationService.removeConfigurationChannelById(config.getId());
// // logging here
} catch (Exception ex1) {
}
}))).publishSubscribeChannel(s -> s
.subscribe(f -> {
f.handle(Sftp.outboundAdapter(outboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));
})
.subscribe(f -> {
if (doArchive) {
f.handle(Sftp.outboundAdapter(inboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getInboundArchiveDirectory()));
} else {
f.handle(m -> {
});
}
})
.subscribe(f -> f
.handle(m -> {
// I am handling exception here
})
))
.get();
и вот исключения
2020-01-27 21:36:55,731 INFO o.s.i.c.PublishSubscribeChannel - Channel
'application.2.subFlow#0.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2020-01-27 21:36:55,731 INFO o.s.i.c.DirectChannel - Channel 'application.2.subFlow#2.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1
EDIT После передачи регулярного выражения в LastModifiedLsEntryFileListFilter и дескриптор там работает для меня. Когда я использую любой другой RegexFilter внутри CompositeFileListFilter, возникает ошибка.
.filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))