Как вызвать метод в моем потоке интеграции - PullRequest
0 голосов
/ 11 декабря 2018

Я создал метод, который читает CSV-файл и записывает данные в новый CSV-файл. Я хочу вызвать этот метод в своем процессе интеграции, поэтому в основном я извлекаю CSV-файл с моего FTP-сервера с именем FEFOexportBEY..csv и я хотим сгенерировать новый файл из этого csv и назвать его finalBEY.csv, настраиваемый метод должен выполнить запись, но я хочу вызвать его в потоке после опроса csv из FTP.Ниже приведено кодирование настраиваемого метода и интеграция потоков. Я обнаружил, что это можно сделать с помощью .handle, но не смог найти правильный путь.В моем старом проекте я смог сделать это в XML-конфигурации, создав бин с именем класса метода, а затем внедрив метод в маршрут, теперь я использую DSL, и должен быть возможный способ, если кто-то может помочь.

Метод обработки

@Component
public class CSVToCSVNoQ {

public CSVToCSVNoQ() {
}

public void writeCSVfinal(String payload,@Header("new") String newCSV,@Header("old") String oldCsv) throws IOException {

    CSVReader reader = null;
    reader = new CSVReader(new FileReader(oldCsv));
    FileWriter fileWriter = new FileWriter(newCSV);

    //try (CSVWriter writer = new CSVWriter(new FileWriter(newCSV), ',', CSVWriter.NO_QUOTE_CHARACTER)) {
    try(CSVWriter csvWriter = new CSVWriter(fileWriter,CSVWriter.DEFAULT_SEPARATOR,
                                                        CSVWriter.NO_QUOTE_CHARACTER)){
        List<String[]> line;
        reader.readNext();
        reader.readNext();
        SimpleDateFormat from = new SimpleDateFormat("yyyy-mm-dd");
        SimpleDateFormat to = new SimpleDateFormat("ddMMMyyyy");

        line = reader.readAll();

        Iterator<String[]> itr = line.iterator();
        while (itr.hasNext()){
            String[] array = itr.next();
            if(array[0].equals("DET")) {
                // System.out.println("Change Format " + to.format(from.parse(array[5])));
                array[5] = to.format(from.parse(array[5]));
            }
        }

        while (itr.hasNext()){
            String[] array = itr.next();
            System.out.println("Line " + itr.next());
        }

        csvWriter.writeAll(line);
        csvWriter.close();

    } catch (IOException e) {
        e.printStackTrace();
    } catch (ParseException e) {
        e.printStackTrace();
    }

    try {
        reader.close();
    } catch (IOException e) {
        e.printStackTrace();
    }

}

}

Поток интеграции на входящем.

 public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch) throws IOException {

    final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
            .preserveTimestamp(true)
          //.patternFilter("*.csv")
            .maxFetchSize(MAX_MESSAGES_PER_POLL)
            .remoteDirectory(myBranch.getFolderPath())
            .regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
            .deleteRemoteFiles(true)
            .localDirectory(new File(myBranch.getBranchCode()))
            .temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)

            /*.localFilenameExpression(new FunctionExpression<String>(s -> {
                final int fileTypeSepPos = s.lastIndexOf('.');
                return DateTimeFormatter
                        .ofPattern(TIMESTAMP_FORMAT_OF_FILES)
                        .withZone(ZoneId.of(TIMEZONE_UTC))
                        .format(Instant.now())
                        + "_"
                        + s.substring(0,fileTypeSepPos)
                        + s.substring(fileTypeSepPos);
            }))*/;

    // Poller definition
    final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
            .id("stockInboundPoller")
            .autoStartup(true)
            .poller(poller());

    IntegrationFlow flow = IntegrationFlows
            .from(sourceSpecFtp, stockInboundPoller)

            .transform(File.class, p ->{
                // log step
                LOG1.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
                return p;
            })

            .channel(CHANNEL_INTERMEDIATE_STAGE)
            .handle(m -> {
                try {
                    this.csvToCSVNoQ.writeCSVfinal("test", myBranch.getBranchCode() + "/final" + myBranch.getBranchCode() + ".csv", myBranch.getBranchCode() + "/FEFOexport" + myBranch.getBranchCode() + ".csv");
                    LOG1.info("Writing final file .csv " + m);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            })
            .get();

    return flow;
}

Конфигурация старого XML

 <bean id="myBeanId" class="com.preparationforinterview.csvprocessing.CSVTOCSVNOQUT"/>


    <!--BEY routing-->
    <route>
        <from uri="ftp://XXXX@XXX.XXX.XX:21/ftp/erbranch/EDMS/FEFO/?fileName=FEFOexportBEY.csv"/>
        <log message="Level=INFO&amp;showBody=true&amp;showHeaders=true"/>
        <to uri="file:input"/>
        <bean ref="myBeanId" method="writeCSVfinal(output\finalBEY.csv,input\FEFOexportBEY.csv)"/>
        <bean ref="myBeanId" method="readCSVOrder(input\FEFOexportBEY.csv)"/>
        <log message="Inserting orders and details in the database from BEY"/>
        <to uri="ftp://XXXX@XXX.XXX.XX:21/ftp/erbranch/EDMS/FEFO/History/?autoCreate=true&amp"/>
    </route>

1 Ответ

0 голосов
/ 11 декабря 2018

Прежде всего, ваше определение ссылки на метод полностью неверно в этом .handle().

В любом случае, давайте пока забудем об этом!

С другой стороны, ваша writeCSVfinal() сигнатура метода не являетсятак что сообщения дружелюбны.У вас есть два аргумента, которые с точки зрения инициатора идентичны, и вы будете иметь двусмысленность, потому что фреймворк не может определить, какой из них составляет примерно payload из сообщения запроса.

Но мы все же можем достичь целис вашим методом и .handle() следующим образом:

.handle(m -> this.cSVToCSVNoQ.writeCSVfinal(m.getPayload(), m.getHeaders().get(...)))

Или какой-то другой логикой для определения значений аргументов метода.

Также имейте в виду, что ваш метод возвращает voidпоэтому такой обработчик может использоваться только в конце потока.Просто нет ничего, что было бы отправлено на следующий канал в цепочке.

Полностью не ясно, как работает ваш верблюжий код.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...