У меня есть поток интеграции, который обрабатывает метод.
После извлечения файла с FTP-сервера на локальный этот метод читает файл csv ex: foo.csv и создает новый файл bar.csv, затем bar.csv снова переходит с ftpd на FTP-сервер, и теперь проблема в том, что метод продолжает Чтение foo.csv и создание нового bar.csv и его отправка на основе метода опроса. Это делается в методе fileInboundFlowFromFTPServer. Мне нужно выполнить этот процесс один раз, а не повторять этот же файл foo.csv, если он не был изменен или новый foo Вытащил .csv, я использовал метаданное JDBC с помощью @Gary Russell, который работает идеально по мере необходимости, но так как методы-обработчики продолжают читать foo.csv и создают новый bar.csv, тогда дата изменяется и, таким образом, metadatastore обновляется, и файл отправляется снова. Я думаю о решении изменить имя foo.csv, скажем, на foo_10012019.csv и отправить его снова на FTP-сервер в папку History вниз по течению и удалить его из локальной сети, как я могу это сделать? создать новый поток только для части отправки foo_10012019.csv?
вот мой класс интеграции:
public class FTIntegration {
public static final String TIMEZONE_UTC = "UTC";
public static final String TIMESTAMP_FORMAT_OF_FILES = "yyyyMMddHHmmssSSS";
public static final String TEMPORARY_FILE_SUFFIX = ".part";
public static final int POLLER_FIXED_PERIOD_DELAY = 5000;
public static final int MAX_MESSAGES_PER_POLL = 100;
private DataSource dataSource;
//private static final Logger LOG = LoggerFactory.getLogger(FTIntegration.class);
private static final Logger LOG1 = Logger.getLogger(FTIntegration.class);
private static final String CHANNEL_INTERMEDIATE_STAGE = "intermediateChannel";
private static final String OUTBOUND_CHANNEL = "outboundChannel";
/* pulling the server config from postgres DB*/
private final BranchRepository branchRepository;
private CSVToCSVNoQ csvToCSVNoQ;
private String localTempPath;
public FTIntegration(BranchRepository branchRepository) {
this.branchRepository = branchRepository;
public Branch myBranch(){
return new Branch();
* The default poller with 5s, 100 messages, RotatingServerAdvice and transaction.
* @return default poller.
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller(){
return Pollers
* The direct channel for the flow.
* @return MessageChannel
public MessageChannel stockIntermediateChannel() {
return new DirectChannel();
* Get the files from a remote directory. Add a timestamp to the filename
* and write them to a local temporary folder.
* @return IntegrationFlow
public PropertiesPersistingMetadataStore store() {
PropertiesPersistingMetadataStore store = new PropertiesPersistingMetadataStore();
return store;
public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch) throws IOException {
final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
.localDirectory(new File(myBranch.getBranchCode()))
/*.localFilenameExpression(new FunctionExpression<String>(s -> {
final int fileTypeSepPos = s.lastIndexOf('.');
return DateTimeFormatter
+ "_"
+ s.substring(0,fileTypeSepPos)
+ s.substring(fileTypeSepPos);
// Poller definition
final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
IntegrationFlow flow = IntegrationFlows
.from(sourceSpecFtp, stockInboundPoller)
.transform(File.class, p ->{
// log step
LOG1.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
return p;
.handle(m -> {
try {
this.csvToCSVNoQ.writeCSVfinal("test", myBranch.getBranchCode() + "/final" + myBranch.getBranchCode() + ".csv", myBranch.getBranchCode() + "/FEFOexport" + myBranch.getBranchCode() + ".csv");
LOG1.info("Writing final file .csv " + m);
} catch (IOException e) {
return flow;
public IntegrationFlow stockIntermediateStageChannel() {
IntegrationFlow flow = IntegrationFlows
.transform(p -> {
//log step
LOG1.info("flow=stockIntermediateStageChannel, message=rename file: " + p);
return p;
.channel(new NullChannel())
return flow;
* Creating the outbound adaptor to send files from local to FTP server
* */
public IntegrationFlow localToFtpFlow(Branch myBranch){
return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
.filter(new ChainFileListFilter<File>()
.addFilter(new RegexPatternFileListFilter("final" + myBranch.getBranchCode() +".csv"))
.addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore(dataSource), "foo"))),//FileSystemPersistentAcceptOnceFileListFilter
e -> e.poller(Pollers.fixedDelay(10_000)))
.transform( p ->{
LOG1.info("Sending file " + p + " to FTP branch " + myBranch.getBranchCode());
return p;
.remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpressionString("payload.delete() + ' was successful'");
advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
return advice;
public DefaultFtpSessionFactory createNewFtpSessionFactory(Branch branch){
final DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
return factory;
public ConcurrentMetadataStore metadataStore(final DataSource dataSource) {
return new JdbcMetadataStore(dataSource);