Отправка файлов из локальной папки на FTP с помощью Spring Integration - PullRequest
0 голосов
/ 28 ноября 2018

У меня есть проблема, когда я опрашиваю данные с FTP-сервера (ов) в локальную папку, например, файл FEFOexportBEY.csv, как только этот файл находится в удаленном каталоге, я опрашиваю его локально, без проблем,затем я использую этот файл для создания нового файла с именем finalBEY.csv в моей локальной папке, затем я хочу передать поток этого файла в папку ftp, где я получил исходную, моя проблема в том, что я смог отправитьfinalBEY.csv только один раз, этот процесс будет происходить часто, поэтому, если я вытащу FEFOexportBEY.csv 3 раза в день, я сгенерирую finalBEY.csv три раза и отправлю один и тот же три раза в нисходящий поток, он не работаетсо мной для этого, это только отправляет его один раз, и если я пытаюсь удалить finalBEY.csv и сгенерировал новый, приложение не отправляет его, ниже весь мой код в файле конфигурации и файле контроллера, пожалуйста, помогитедайте мне знать, как я могу продолжать смотреть или опрашивать локальную папку, например, BEY для нового finalBEY.csv и отправлять егопо назначению.

     @Configuration

        @EnableIntegration
        public class FTIntegration {

        public static final String TIMEZONE_UTC = "UTC";
        public static final String TIMESTAMP_FORMAT_OF_FILES = "yyyyMMddHHmmssSSS";
        public static final String TEMPORARY_FILE_SUFFIX = ".part";
        public static final int POLLER_FIXED_PERIOD_DELAY = 5000;
        public static final int MAX_MESSAGES_PER_POLL = 100;


        private static final Logger LOG = LoggerFactory.getLogger(FTIntegration.class);
        private static final String CHANNEL_INTERMEDIATE_STAGE = "intermediateChannel";

        private static final String OUTBOUND_CHANNEL = "outboundChannel";

        /* pulling the server config from postgres DB*/

        private final BranchRepository branchRepository;

        @Value("${app.temp-dir}")
        private String localTempPath;

        public FTIntegration(BranchRepository branchRepository) {
            this.branchRepository = branchRepository;
        }

        @Bean
        public Branch myBranch(){
            return new Branch();
        }

        /**
         * The default poller with 5s, 100 messages, RotatingServerAdvice and transaction.
         *
         * @return default poller.
         */
        @Bean(name = PollerMetadata.DEFAULT_POLLER)
        public PollerMetadata poller(){
            return Pollers
                    .fixedDelay(POLLER_FIXED_PERIOD_DELAY)
                    .maxMessagesPerPoll(MAX_MESSAGES_PER_POLL)
                    .transactional()
                    .get();
        }

        /**
         * The direct channel for the flow.
         *
         * @return MessageChannel
         */
        @Bean
        public MessageChannel stockIntermediateChannel() {
            return new DirectChannel();
        }
        /**
         * Get the files from a remote directory. Add a timestamp to the filename
         * and write them to a local temporary folder.
         *
         * @return IntegrationFlow
         */


          //@Bean
           public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch){

            final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
                    .preserveTimestamp(true)
                    //.patternFilter("*.csv")
                    .maxFetchSize(MAX_MESSAGES_PER_POLL)
                    .remoteDirectory(myBranch.getFolderPath())
                    .regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
                    .deleteRemoteFiles(true)
                    .localDirectory(new File(myBranch.getBranchCode()))
                    .localFilter(new AcceptAllFileListFilter())
                        .temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
                        .localFilenameExpression(new FunctionExpression<String>(s -> {
                            final int fileTypeSepPos = s.lastIndexOf('.');
                            return DateTimeFormatter
                                    .ofPattern(TIMESTAMP_FORMAT_OF_FILES)
                                    .withZone(ZoneId.of(TIMEZONE_UTC))
                                    .format(Instant.now())
                                    + "_"
                                    + s.substring(0,fileTypeSepPos)
                                    + s.substring(fileTypeSepPos);
                        }));

            // Poller definition
        final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
                .id("stockInboundPoller")
                .autoStartup(true)
                .poller(poller());

        IntegrationFlow flow = IntegrationFlows
                .from(sourceSpecFtp, stockInboundPoller)

                .transform(File.class, p ->{
                    // log step
                    LOG.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
                    return p;
                })
                .channel(CHANNEL_INTERMEDIATE_STAGE)
                .get();

            return flow;
        }

       @Bean
        public IntegrationFlow stockIntermediateStageChannel() {
            IntegrationFlow flow = IntegrationFlows
                    .from(CHANNEL_INTERMEDIATE_STAGE)
                    .transform(p -> {
                        //log step
                        LOG.info("flow=stockIntermediateStageChannel, message=rename file: " + p);
                        return p;
                    })
                    //TODO
                    .channel(new NullChannel())
                    .get();

            return flow;

        }

        /*
        * Creating the outbound adaptor
        *
        * */

        public IntegrationFlow localToFtpFlow(Branch myBranch){
    return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
            .filter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(10_000)))
            .log()
            .handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
                    .useTemporaryFileName(true)
                    .autoCreateDirectory(true)
                    .remoteDirectory(myBranch.getFolderPath()))
            .get();
}


public interface SendToFtpDirect{
          void send(byte[] bytes, @Header(FileHeaders.FILENAME) String filename);

             }

        public DefaultFtpSessionFactory createNewFtpSessionFactory(Branch branch){
            final DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
            factory.setHost(branch.getHost());
            factory.setUsername(branch.getUsern());
            factory.setPort(branch.getFtpPort());
            factory.setPassword(branch.getPassword());
            return factory;
        }

    }

Класс контроллера:

    @Controller
public class BranchController {

    private BranchService branchService;
    private BranchToBranchForm branchToBranchForm;

    //@Autowired
    private Branch branch;

    @Autowired
    private FTIntegration.MyGateway myGateway;

    @Autowired
    private FTIntegration ftIntegration;

    @Autowired
    private IntegrationFlowContext flowContext;

private FTIntegration.SendToFtpDirect gate;

    @Autowired
    public void setBranchService(BranchService branchService) {
        this.branchService = branchService;
    }

    @Autowired
    public void setBranchToBranchForm(BranchToBranchForm branchToBranchForm) {
        this.branchToBranchForm = branchToBranchForm;
    }

    @RequestMapping( "/")
    public String branch(){return "redirect:/branch/list";}

    @RequestMapping({"/branch/list","/branch"})
    public String listBranches(Model model){
        model.addAttribute("branches",branchService.listAll());
        return "branch/list";
    }

    @RequestMapping("/branch/showbranch/{id}")
    public String getBranch (@PathVariable String id, Model model){
       model.addAttribute("branch", branchService.getById(Long.valueOf(id)));
       addFlowFtp(id);
       addFlowftpOutbound(id);
       return "/branch/showbranch";

    }

    @RequestMapping("/branch/edit/{id}")
    public String edit(@PathVariable String id, Model model){
        Branch branch = branchService.getById(Long.valueOf(id));
        BranchForm branchForm = branchToBranchForm.convert(branch);
        model.addAttribute("branchForm",branchForm);
        return "branch/branchform";

    }

    @RequestMapping("/branch/new")
    public String newBranch(Model model){
        model.addAttribute("branchForm", new BranchForm());
         return "branch/branchform";
    }

    //@PostMapping
    @RequestMapping(value = "/branch", method = RequestMethod.POST)
    public String saveOrUpdateBranch(@Valid BranchForm branchForm, BindingResult bindingResult){

        if(bindingResult.hasErrors()){
            return "branch/branchform";
        }

        Branch savedBranch = branchService.saveOrUpdateBranchForm(branchForm);
        return "redirect:/branch/showbranch/" + savedBranch.getId();
    }

    @RequestMapping("/branch/delete/{id}")
    private String delete(@PathVariable String id){
        branchService.delete(Long.valueOf(id));
        flowContext.remove(id);
        flowContext.remove(id+"o");
        return "redirect:/branch/list";
    }

    private void addFlowFtp(String name) {
        branch = branchService.getById(Long.valueOf(name));
        System.out.println(branch.getBranchCode());
        IntegrationFlow flow = ftIntegration.fileInboundFlowFromFTPServer(branch);
        this.flowContext.registration(flow).id(name).register();
    }

   private void addFlowftpOutbound(String name) {
    branch = branchService.getById(Long.valueOf(name));
    System.out.println(branch.getBranchCode());
    IntegrationFlow flow = ftIntegration.localToFtpFlow(branch);//ftpOutboundFlow(branch);
    this.flowContext.registration(flow).id(name +"o").register();
    //gate.send("BEY".getBytes(),"final"+ branch.getBranchCode()+ ".csv" );

}


}

1 Ответ

0 голосов
/ 28 ноября 2018

Если вы используете последнюю версию, новая версия файла должна пройти, если изменилась его измененная временная метка.

См. документацию .

Вы можете использовать атрибут local-filter для настройки поведения фильтра локальной файловой системы.Начиная с версии 4.3.8, FileSystemPersistentAcceptOnceFileListFilter настраивается по умолчанию.Этот фильтр сохраняет принятые имена файлов и измененную временную метку в экземпляре стратегии MetadataStore (см. Раздел 12.5, «Хранилище метаданных») и обнаруживает изменения локального времени изменения файла.MetadataStore по умолчанию - это SimpleMetadataStore, который хранит состояние в памяти.

Проверьте, что находится в локальном фильтре;также включите ведение журнала отладки, чтобы узнать, предоставляет ли он вам дополнительную информацию.

РЕДАКТИРОВАТЬ

Это прекрасно работает для меня ...

@SpringBootApplication
public class So53521657Application {

    public static void main(String[] args) {
        SpringApplication.run(So53521657Application.class, args);
    }

    @Bean
    public CachingSessionFactory<FTPFile> sf() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("10.0.0.3");
        sf.setUsername("ftptest");
        sf.setPassword("ftptest");
        return new CachingSessionFactory<>(sf);
    }

    @Bean
    public IntegrationFlow webToFtpFlow() {
        return IntegrationFlows.from(SendToFtpDirect.class)
                .log()
                .handle(Ftp.outboundAdapter(sf()).remoteDirectory("foo"))
                .get();
    }

    @Bean
    public IntegrationFlow ftpToLocalFlow() {
        return IntegrationFlows.from(Ftp.inboundAdapter(sf())
                    .remoteDirectory("foo")
                    .deleteRemoteFiles(true)
                    .localFilter(new SimplePatternFileListFilter("*.csv"))
                    .localDirectory(new File("/tmp/foo")), e ->
                        e.poller(Pollers.fixedDelay(5_000)))
                .log()
                .<File>handle((p, h) -> {
                        File newFile = new File("/tmp/bar/" + p.getName().replace(".csv", ".txt"));
                        newFile.delete();
                        System.out.println("renaming " + p + " to " + newFile);
                        p.renameTo(newFile);
                        return p;
                    })
                .log()
                .nullChannel();
    }

    @Bean
    public IntegrationFlow localToFtpFlow() {
        return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/bar"))
                        .filter(new FileSystemPersistentAcceptOnceFileListFilter(
                                new SimpleMetadataStore(), "foo")), e ->
                    e.poller(Pollers.fixedDelay(10_000)))
                .log()
                .handle(Ftp.outboundAdapter(sf())
                        .remoteDirectory("bar"))
                .get();
    }

}

@RestController
@DependsOn("webToFtpFlow")
class Controller {

    @Autowired
    private SendToFtpDirect gate;

    @GetMapping(path = "/send/{name}")
    public String send(@PathVariable String name) {
        gate.send("foo".getBytes(), name + ".csv");
        return name + " sent";
    }

}

interface SendToFtpDirect {

    void send(byte[] bytes, @Header(FileHeaders.FILENAME) String filename);

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...