Я создал метод, который читает CSV-файл и записывает данные в новый CSV-файл. Я хочу вызвать этот метод в своем процессе интеграции, поэтому в основном я извлекаю CSV-файл с моего FTP-сервера с именем FEFOexportBEY..csv и я хотим сгенерировать новый файл из этого csv и назвать его finalBEY.csv, настраиваемый метод должен выполнить запись, но я хочу вызвать его в потоке после опроса csv из FTP.Ниже приведено кодирование настраиваемого метода и интеграция потоков. Я обнаружил, что это можно сделать с помощью .handle, но не смог найти правильный путь.В моем старом проекте я смог сделать это в XML-конфигурации, создав бин с именем класса метода, а затем внедрив метод в маршрут, теперь я использую DSL, и должен быть возможный способ, если кто-то может помочь.
Метод обработки
@Component
public class CSVToCSVNoQ {
public CSVToCSVNoQ() {
}
public void writeCSVfinal(String payload,@Header("new") String newCSV,@Header("old") String oldCsv) throws IOException {
CSVReader reader = null;
reader = new CSVReader(new FileReader(oldCsv));
FileWriter fileWriter = new FileWriter(newCSV);
//try (CSVWriter writer = new CSVWriter(new FileWriter(newCSV), ',', CSVWriter.NO_QUOTE_CHARACTER)) {
try(CSVWriter csvWriter = new CSVWriter(fileWriter,CSVWriter.DEFAULT_SEPARATOR,
CSVWriter.NO_QUOTE_CHARACTER)){
List<String[]> line;
reader.readNext();
reader.readNext();
SimpleDateFormat from = new SimpleDateFormat("yyyy-mm-dd");
SimpleDateFormat to = new SimpleDateFormat("ddMMMyyyy");
line = reader.readAll();
Iterator<String[]> itr = line.iterator();
while (itr.hasNext()){
String[] array = itr.next();
if(array[0].equals("DET")) {
// System.out.println("Change Format " + to.format(from.parse(array[5])));
array[5] = to.format(from.parse(array[5]));
}
}
while (itr.hasNext()){
String[] array = itr.next();
System.out.println("Line " + itr.next());
}
csvWriter.writeAll(line);
csvWriter.close();
} catch (IOException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Поток интеграции на входящем.
public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch) throws IOException {
final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
.preserveTimestamp(true)
//.patternFilter("*.csv")
.maxFetchSize(MAX_MESSAGES_PER_POLL)
.remoteDirectory(myBranch.getFolderPath())
.regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
.deleteRemoteFiles(true)
.localDirectory(new File(myBranch.getBranchCode()))
.temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
/*.localFilenameExpression(new FunctionExpression<String>(s -> {
final int fileTypeSepPos = s.lastIndexOf('.');
return DateTimeFormatter
.ofPattern(TIMESTAMP_FORMAT_OF_FILES)
.withZone(ZoneId.of(TIMEZONE_UTC))
.format(Instant.now())
+ "_"
+ s.substring(0,fileTypeSepPos)
+ s.substring(fileTypeSepPos);
}))*/;
// Poller definition
final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
.id("stockInboundPoller")
.autoStartup(true)
.poller(poller());
IntegrationFlow flow = IntegrationFlows
.from(sourceSpecFtp, stockInboundPoller)
.transform(File.class, p ->{
// log step
LOG1.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
return p;
})
.channel(CHANNEL_INTERMEDIATE_STAGE)
.handle(m -> {
try {
this.csvToCSVNoQ.writeCSVfinal("test", myBranch.getBranchCode() + "/final" + myBranch.getBranchCode() + ".csv", myBranch.getBranchCode() + "/FEFOexport" + myBranch.getBranchCode() + ".csv");
LOG1.info("Writing final file .csv " + m);
} catch (IOException e) {
e.printStackTrace();
}
})
.get();
return flow;
}
Конфигурация старого XML
<bean id="myBeanId" class="com.preparationforinterview.csvprocessing.CSVTOCSVNOQUT"/>
<!--BEY routing-->
<route>
<from uri="ftp://XXXX@XXX.XXX.XX:21/ftp/erbranch/EDMS/FEFO/?fileName=FEFOexportBEY.csv"/>
<log message="Level=INFO&showBody=true&showHeaders=true"/>
<to uri="file:input"/>
<bean ref="myBeanId" method="writeCSVfinal(output\finalBEY.csv,input\FEFOexportBEY.csv)"/>
<bean ref="myBeanId" method="readCSVOrder(input\FEFOexportBEY.csv)"/>
<log message="Inserting orders and details in the database from BEY"/>
<to uri="ftp://XXXX@XXX.XXX.XX:21/ftp/erbranch/EDMS/FEFO/History/?autoCreate=true&"/>
</route>