Spring Integration обрабатывает проблему метода - PullRequest
0 голосов
/ 10 января 2019

У меня есть поток интеграции, который обрабатывает метод. После извлечения файла с FTP-сервера на локальный этот метод читает файл csv ex: foo.csv и создает новый файл bar.csv, затем bar.csv снова переходит с ftpd на FTP-сервер, и теперь проблема в том, что метод продолжает Чтение foo.csv и создание нового bar.csv и его отправка на основе метода опроса. Это делается в методе fileInboundFlowFromFTPServer. Мне нужно выполнить этот процесс один раз, а не повторять этот же файл foo.csv, если он не был изменен или новый foo Вытащил .csv, я использовал метаданное JDBC с помощью @Gary Russell, который работает идеально по мере необходимости, но так как методы-обработчики продолжают читать foo.csv и создают новый bar.csv, тогда дата изменяется и, таким образом, metadatastore обновляется, и файл отправляется снова. Я думаю о решении изменить имя foo.csv, скажем, на foo_10012019.csv и отправить его снова на FTP-сервер в папку History вниз по течению и удалить его из локальной сети, как я могу это сделать? создать новый поток только для части отправки foo_10012019.csv?

вот мой класс интеграции:

@Configuration
@EnableIntegration
@ComponentScan
public class FTIntegration {

public static final String TIMEZONE_UTC = "UTC";
public static final String TIMESTAMP_FORMAT_OF_FILES = "yyyyMMddHHmmssSSS";
public static final String TEMPORARY_FILE_SUFFIX = ".part";
public static final int POLLER_FIXED_PERIOD_DELAY = 5000;
public static final int MAX_MESSAGES_PER_POLL = 100;


private DataSource dataSource;

//private static final Logger LOG = LoggerFactory.getLogger(FTIntegration.class);
private static final Logger LOG1 = Logger.getLogger(FTIntegration.class);
private static final String CHANNEL_INTERMEDIATE_STAGE = "intermediateChannel";

private static final String OUTBOUND_CHANNEL = "outboundChannel";

/* pulling the server config from postgres DB*/

private final BranchRepository branchRepository;

@Autowired
private CSVToCSVNoQ csvToCSVNoQ;

@Value("${app.temp-dir}")
private String localTempPath;


public FTIntegration(BranchRepository branchRepository) {
    this.branchRepository = branchRepository;
}

@Bean
public Branch myBranch(){
    return new Branch();
}

/**
 * The default poller with 5s, 100 messages, RotatingServerAdvice and transaction.
 *
 * @return default poller.
 */
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller(){
    return Pollers
            .fixedDelay(POLLER_FIXED_PERIOD_DELAY)
            .maxMessagesPerPoll(MAX_MESSAGES_PER_POLL)
            .transactional()
            .get();
}

/**
 * The direct channel for the flow.
 *
 * @return MessageChannel
 */
@Bean
public MessageChannel stockIntermediateChannel() {
    return new DirectChannel();
}
/**
 * Get the files from a remote directory. Add a timestamp to the filename
 * and write them to a local temporary folder.
 *
 * @return IntegrationFlow
 */

@Bean
public PropertiesPersistingMetadataStore store() {
    PropertiesPersistingMetadataStore store = new PropertiesPersistingMetadataStore();
    return store;
}
   public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch) throws IOException {

    final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
            .preserveTimestamp(true)
          //.patternFilter("*.csv")
            .maxFetchSize(MAX_MESSAGES_PER_POLL)
            .remoteDirectory(myBranch.getFolderPath())
            .regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
            .deleteRemoteFiles(true)
            .localDirectory(new File(myBranch.getBranchCode()))
            .temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)


            /*.localFilenameExpression(new FunctionExpression<String>(s -> {
                final int fileTypeSepPos = s.lastIndexOf('.');
                return DateTimeFormatter
                        .ofPattern(TIMESTAMP_FORMAT_OF_FILES)
                        .withZone(ZoneId.of(TIMEZONE_UTC))
                        .format(Instant.now())
                        + "_"
                        + s.substring(0,fileTypeSepPos)
                        + s.substring(fileTypeSepPos);
            }))*/;

    // Poller definition
    final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
            .id("stockInboundPoller")
            .autoStartup(true)
            .poller(poller());

    IntegrationFlow flow = IntegrationFlows
            .from(sourceSpecFtp, stockInboundPoller)

            .transform(File.class, p ->{
                // log step
                LOG1.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
                return p;
            })
            .handle(m -> {
                try {
                    this.csvToCSVNoQ.writeCSVfinal("test", myBranch.getBranchCode() + "/final" + myBranch.getBranchCode() + ".csv", myBranch.getBranchCode() + "/FEFOexport" + myBranch.getBranchCode() + ".csv");
                    LOG1.info("Writing final file .csv " + m);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            })
            .get();

    return flow;
}

@Bean
public IntegrationFlow stockIntermediateStageChannel() {
    IntegrationFlow flow = IntegrationFlows
            .from(CHANNEL_INTERMEDIATE_STAGE)
            .transform(p -> {
                //log step
                LOG1.info("flow=stockIntermediateStageChannel, message=rename file: " + p);

                return p;
            })
            //TODO
            .channel(new NullChannel())
            .get();

    return flow;

}

/*
* Creating the outbound adaptor to send files from local to FTP server
*
* */


public IntegrationFlow localToFtpFlow(Branch myBranch){

         return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
                    .filter(new ChainFileListFilter<File>()
                            .addFilter(new RegexPatternFileListFilter("final" + myBranch.getBranchCode() +".csv"))
                            .addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore(dataSource), "foo"))),//FileSystemPersistentAcceptOnceFileListFilter
            e -> e.poller(Pollers.fixedDelay(10_000)))

            .transform( p ->{
                LOG1.info("Sending file " + p + " to FTP branch " + myBranch.getBranchCode());

                return p;
            })


            .log()
            .handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch),FileExistsMode.REPLACE)
                    .useTemporaryFileName(true)
                    .autoCreateDirectory(false)
                    .remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))
                    )
            .get();
}


    @Bean
public Advice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    //advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload.delete() + ' was successful'");
    //advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
    advice.setTrapException(true);
    return advice;
}


public DefaultFtpSessionFactory createNewFtpSessionFactory(Branch branch){
    final DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
    factory.setHost(branch.getHost());
    factory.setUsername(branch.getUsern());
    factory.setPort(branch.getFtpPort());
    factory.setPassword(branch.getPassword());
    return factory;
}

@Bean
public ConcurrentMetadataStore metadataStore(final DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
}

}

1 Ответ

0 голосов
/ 10 января 2019

OK; Частично проблема заключается в том, что вы повторно выбираете удаленный файл при каждом опросе. Однако неясно, почему оно отправляется снова как новое сообщение, потому что по умолчанию .localFilter является AcceptOnceFileListFilter, поэтому его следует игнорировать; возможно, ведение журнала отладки или запуск в отладчике поможет выяснить, что там происходит.

Вы должны добавить FtpPersistentAcceptOnceFileListFilter к .filter. Таким образом, вы будете повторно извлекать файл, только если временная метка изменится на удаленном сервере.

Кроме того, если вы хотите обработать такие ситуации, localFilter требуется FileSystemAcceptOnceFileListFilter, поэтому он пропустит файл, если отметка времени изменится.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...