Обработка выходного адаптера Java - PullRequest
0 голосов
/ 19 ноября 2018

Я создал приложение, которое добавляет входящие адаптеры во время выполнения для серверов ftp и регистрирует их для удаления на определенном этапе, если это необходимо, это приложение будет извлекать файл csv с серверов (серверов) ftp и помещать его в мой локальный в папка с именем ftp-сервера, поэтому на каждом добавляемом мной сервере будет создана отдельная локальная папка, и в ней будет сохранен файл csv, теперь это выполняется плавно, вторая часть - я хочу изменить формат этого файла и затем отправьте его обратно на соответствующий сервер, поэтому в основном мне нужно использовать исходящий адаптер, в этом случае мне потребуется создавать исходящие адаптеры во время выполнения одновременно с созданием входящего адаптера или добавлением сервера, это следует сделать с помощью Контроллер такой же, как входящий, я искал возможные решения и пробовал одно, которое ниже, но не работало или не выполняло какую-либо отправку файлов по назначению, какое-либо решение о том, как я могу это сделать? В классе конфигурации я добавил следующее:

public IntegrationFlow ftpOutboundFlow(Branch myBranch){

    return IntegrationFlows.from(OUTBOUND_CHANNEL)
            .handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.FAIL)
                    .useTemporaryFileName(true)
                    .remoteFileSeparator("/")
                    //.fileNameExpression("BEY/FEFOexportBEY.csv")
            .remoteDirectory(myBranch.getFolderPath()))
            .get();

}

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = OUTBOUND_CHANNEL)
    void sendToFtp(File file);

}



 @Bean
    public IntegrationFlow ftpOutboundChannel() {

        return IntegrationFlows.from(OUTBOUND_CHANNEL)
                .publishSubscribeChannel(p -> p
                        .subscribe(t -> t.handle(System.out::println)))
                /*.transform(p -> {
                    LOG.info("Outbound intermediate Channel, message=rename file:" + p);
                    return p;
                })*/
                .channel(new NullChannel())
                .get();
    }

А в классе контроллеров

@RequestMapping("/branch/showbranch/{id}")
    public String getBranch (@PathVariable String id, Model model){
       model.addAttribute("branch", branchService.getById(Long.valueOf(id)));
       addFlowftp(id);
       addFlowftpOutbound(id);
        return "/branch/showbranch";

}

private void addFlowFtp(String name) {
        branch = branchService.getById(Long.valueOf(name));
        System.out.println(branch.getBranchCode());
        IntegrationFlow flow = ftIntegration.fileInboundFlowFromFTPServer(branch);
        this.flowContext.registration(flow).id(name).register();
    }

    private void addFlowftpOutbound(String name) {
        branch = branchService.getById(Long.valueOf(name));
        System.out.println(branch.getBranchCode());
        IntegrationFlow flow = ftIntegration.ftpOutboundFlow(branch);
      //  this.flowContext.registration(flow).id(name).register();
        myGateway.sendToFtp(new File("BEY/FEFOexportBEY.csv"));
    }

Вот что я получаю в консоли как ошибка, когда я включаю регистр перед отправкой файла:

java.lang.IllegalArgumentException: An IntegrationFlow 'IntegrationFlowRegistration{integrationFlow=StandardIntegrationFlow{integrationComponents={org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer@aadab28=98.org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean@aa290b3=stockInboundPoller, org.springframework.integration.transformer.MethodInvokingTransformer@e5f85cd=98.org.springframework.integration.transformer.MethodInvokingTransformer#0, 98.channel#0=98.channel#0, org.springframework.integration.config.ConsumerEndpointFactoryBean@319dff2=98.org.springframework.integration.config.ConsumerEndpointFactoryBean#0}}, id='98', inputChannel=null}' with flowId '98' is already registered.
An existing IntegrationFlowRegistration must be destroyed before overriding.

После второго пробного периода, когда я удалил регистрацию из первого метода и попробовал только вторым методом, но на FTP ничего не было отправлено:

GenericMessage [payload=BEY\FEFOexportBEY.csv, headers={id=43cfc2db-41e9-0866-8e4c-8e95968189ff, timestamp=1542702869007}]
2018-11-20 10:34:29.011  INFO 13716 --- [nio-8081-exec-6] f.s.s.configuration.FTIntegration        : Outbound intermediate Channel, message=rename file:BEY\FEFOexportBEY.csv

1 Ответ

0 голосов
/ 19 ноября 2018

Вам необходимо выполнить this.flowContext.registration(flow).id(name).register(); перед отправкой файла через myGateway.sendToFtp(new File("BEY/FEFOexportBEY.csv"));.

Это первое.

Другая проблема, которую я вижу в вашем коде, заключается в том, что у вас есть бин ftpOutboundChannel для IntegrationFlow, который подписан на тот же OUTBOUND_CHANNEL. Если тот не объявлен как PublishSubscribeChannel, то вы получите циклический дистрибутив. И я считаю, что вы хотели бы, чтобы файл был отправлен на FTP и вошел в систему. Таким образом, вам действительно нужно объявить этот канал как PublishSubscribeChannel.

У вас нет ошибок, потому что OUTBOUND_CHANNEL имеет ваш ftpOutboundChannel IntegrationFlow в качестве подписчика.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...