У меня есть верблюжий маршрут при получении файлов, иногда эти файлы могут содержать несколько тысяч пустых строк или записей. это происходит в конце файлов.
помощь или совет о том, как справиться с этой ситуацией.
2/3/20 0:25,12.0837099,22.07255971,51.15338002,52.76662495,52.34712651,51.12155216,45.7655507,49.96555147,54.47205637,50.66135512,54.43864717,54.31627797,112.11765,1305.89126,1318.734411,52.31780487,44.27374363,48.72548294,43.01383257,23.85434055,41.98898447,47.50916052,31.13055873,112.2747269,0.773642045,1.081464888,2.740194779,1.938788885,1.421660186,0.617588546,21.28219363,25.03362771,26.76627344,40.21132809,29.72854555,33.45911109
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
Маршрут идет в сплиттер.
<route autoStartup="true" id="core.predix.accept.file.type.route">
<from id="_from3" uri="{{fileEntranceEndpoint}}"/>
<convertBodyTo id="_convertBodyTo1" type="java.lang.String"/>
<split id="_split1" strategyRef="csvAggregationStrategy" streaming="true" stopOnException="true">
<tokenize token="\n"/>
<process id="_process3" ref="toCsvFormat"/>
<!-- passthru only we do not allow embedded commas in numeric data -->
</split>
<log id="_log1" loggingLevel="INFO" message="CSV body: ${body}"/>
<choice id="_choice1">
<when id="_when1">
<simple>${header.CamelFileName} regex '^.*\.(csv|CSV|txt|gpg)$'</simple>
<log id="_log2" message="${file:name} accepted for processing..."/>
<choice id="_choice2">
<when id="_when2">
<simple>${header.CamelFileName} regex '^.*\.(CSV|txt|gpg)$'</simple>
<setHeader headerName="CamelFileName" id="_setHeader1">
<simple>${file:name.noext.single}.csv</simple>
</setHeader>
<log id="_log3" message="${file:name} changed file name."/>
</when>
</choice>
<split id="_split2" streaming="true">
<tokenize prop:group="noOfLines" token="\n"/>
<log id="_log4" message="Split Group Body: ${body}"/>
<to id="_to1" uri="bean:extractHeader"/>
<to id="acceptedFileType" ref="predixConsumer"/>
</split>
<to id="_to2" uri="bean:extractHeader?method=cleanHeader"/>
</when>
<otherwise id="_otherwise1">
<log id="_log5" loggingLevel="INFO" message="${file:name} is an unknown file type, sending to unhandled repo."/>
<to id="_to3" uri="{{unhandledArchive}}"/>
</otherwise>
</choice>
</route>
Простой агрегатор
public class CsvAggregationStrategy implements AggregationStrategy {
private Logger log = LoggerFactory.getLogger(CsvAggregationStrategy.class.getName());
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
//Theory
//-------------------------------------------------------------------------------------
// Arrived | oldExchange | newExchange | Description
//-------------------------------------------------------------------------------------
// A | NULL | A | first message arrives for the first group
// B | A | B | second message arrives for the first group
// F | NULL | F | first message arrives for the second group
// C | AB | C | third message arrives for the first group
//---------------------------------------------------------------------------------------
log.debug("Aggregation Strategy :: start");
if ( newExchange.getException() != null ) {
if ( oldExchange == null ) {
return newExchange;
} else {
oldExchange.setException(newExchange.getException());
return oldExchange;
}
}
if ( oldExchange == null ) { //This will set the 1st record with the Header
return newExchange;
}
String newBody = newExchange.getIn().getBody(String.class);
String oldBody = oldExchange.getIn().getBody(String.class);
String body = oldBody + newBody;
oldExchange.getIn().setBody( body );
log.debug("Aggregation Strategy :: finish");
return oldExchange;
} //Exchange process
} //class AggregationStrategy
Я думал, что обработаю пустые строки в классе toCsvFormat
Класс ToCsvFormat просто меняет входящий разделитель csv на запятую.
public class ToCsvFormat implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(ToCsvFormat.class);
@Override
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
body = body.replaceAll("\\t|;",",");
String bodyCheck = body.replaceAll(",","").trim();
LOG.info("BODY CHECK: " + bodyCheck);
if ( bodyCheck.isEmpty() || bodyCheck == null ) {
throw new IllegalArgumentException("Data record is Empty or NULL. Invalid Data!");
} else {
StringBuilder sb = new StringBuilder(body.trim());
LOG.debug("CSV Format Body In: " + sb.toString());
LOG.debug("sb length: " + sb.length());
if ( sb.toString().endsWith(",") ) {
sb.deleteCharAt(sb.lastIndexOf(",", sb.length()));
}
LOG.info("CSV Format Body Out: " + sb.toString());
sb.append(System.lineSeparator());
exchange.getIn().setBody(sb.toString());
}
}
}
*** У меня проблема в том, что мне нужно, чтобы сплиттер завершил обработку sh до тех пор, пока он не достигнет всех пустых строк, или не пропустил или не остановил сплиттер на пустые записи. но мне нужно то, что было ранее разделено или обработано. Бросок и захват исключения останавливает сплиттер, я ничего не получаю. Я использую разделитель stoponexception, но, как говорится, он останавливается на исключении.
спасибо