может ли верблюжий сплиттер пропустить строки сообщений с некоторым значением, например пустым или нулевым? - PullRequest
0 голосов
/ 17 февраля 2020

У меня есть верблюжий маршрут при получении файлов, иногда эти файлы могут содержать несколько тысяч пустых строк или записей. это происходит в конце файлов.

помощь или совет о том, как справиться с этой ситуацией.

2/3/20 0:25,12.0837099,22.07255971,51.15338002,52.76662495,52.34712651,51.12155216,45.7655507,49.96555147,54.47205637,50.66135512,54.43864717,54.31627797,112.11765,1305.89126,1318.734411,52.31780487,44.27374363,48.72548294,43.01383257,23.85434055,41.98898447,47.50916052,31.13055873,112.2747269,0.773642045,1.081464888,2.740194779,1.938788885,1.421660186,0.617588546,21.28219363,25.03362771,26.76627344,40.21132809,29.72854555,33.45911109
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,

Маршрут идет в сплиттер.

    <route autoStartup="true" id="core.predix.accept.file.type.route">
        <from id="_from3" uri="{{fileEntranceEndpoint}}"/>
        <convertBodyTo id="_convertBodyTo1" type="java.lang.String"/>
        <split id="_split1" strategyRef="csvAggregationStrategy" streaming="true" stopOnException="true">
            <tokenize token="\n"/>
            <process id="_process3" ref="toCsvFormat"/>
            <!-- passthru only we do not allow embedded commas in numeric data -->
        </split>
        <log id="_log1" loggingLevel="INFO" message="CSV body: ${body}"/>
        <choice id="_choice1">
            <when id="_when1">
                <simple>${header.CamelFileName} regex '^.*\.(csv|CSV|txt|gpg)$'</simple>
                <log id="_log2" message="${file:name} accepted for processing..."/>
                <choice id="_choice2">
                    <when id="_when2">
                        <simple>${header.CamelFileName} regex '^.*\.(CSV|txt|gpg)$'</simple>
                        <setHeader headerName="CamelFileName" id="_setHeader1">
                            <simple>${file:name.noext.single}.csv</simple>
                        </setHeader>
                        <log id="_log3" message="${file:name} changed file name."/>
                    </when>
                </choice>
                <split id="_split2" streaming="true">
                    <tokenize prop:group="noOfLines" token="\n"/>
                    <log id="_log4" message="Split Group Body: ${body}"/>
                    <to id="_to1" uri="bean:extractHeader"/>
                    <to id="acceptedFileType" ref="predixConsumer"/>
                </split>
                <to id="_to2" uri="bean:extractHeader?method=cleanHeader"/>
            </when>
            <otherwise id="_otherwise1">
                <log id="_log5" loggingLevel="INFO" message="${file:name} is an unknown file type, sending to unhandled repo."/>
                <to id="_to3" uri="{{unhandledArchive}}"/>
            </otherwise>
        </choice>
    </route>

Простой агрегатор

public class CsvAggregationStrategy implements AggregationStrategy {

private Logger log = LoggerFactory.getLogger(CsvAggregationStrategy.class.getName());

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

    //Theory 
    //-------------------------------------------------------------------------------------
    // Arrived    | oldExchange  |  newExchange | Description
    //-------------------------------------------------------------------------------------
    // A         | NULL         |  A            | first message arrives for the first group
    // B         | A            |  B            | second message arrives for the first group
    // F         | NULL         |  F            | first message arrives for the second group
    // C         | AB           |  C            | third message arrives for the first group
    //---------------------------------------------------------------------------------------
    log.debug("Aggregation Strategy :: start");

    if ( newExchange.getException() != null ) {
      if ( oldExchange == null ) {
        return newExchange;
      } else {
        oldExchange.setException(newExchange.getException());
        return oldExchange;
        }
    }

    if ( oldExchange == null ) {  //This will set the 1st record with the Header
        return newExchange;
    }

String newBody = newExchange.getIn().getBody(String.class);
String oldBody = oldExchange.getIn().getBody(String.class);
String body = oldBody + newBody;
oldExchange.getIn().setBody( body );

log.debug("Aggregation Strategy :: finish");
return oldExchange;

} //Exchange process
} //class AggregationStrategy

Я думал, что обработаю пустые строки в классе toCsvFormat

Класс ToCsvFormat просто меняет входящий разделитель csv на запятую.

public class ToCsvFormat implements Processor {

private static final Logger LOG = LoggerFactory.getLogger(ToCsvFormat.class);

@Override
public void process(Exchange exchange) throws Exception {

    String body = exchange.getIn().getBody(String.class);

    body = body.replaceAll("\\t|;",",");

    String bodyCheck = body.replaceAll(",","").trim();
    LOG.info("BODY CHECK: " + bodyCheck);
    if ( bodyCheck.isEmpty() || bodyCheck == null ) {

        throw new IllegalArgumentException("Data record is Empty or NULL. Invalid Data!");

    } else {

        StringBuilder sb = new StringBuilder(body.trim());

        LOG.debug("CSV Format Body In: " + sb.toString());
        LOG.debug("sb length: " + sb.length());

        if ( sb.toString().endsWith(",") ) {

            sb.deleteCharAt(sb.lastIndexOf(",", sb.length()));
        }

        LOG.info("CSV Format Body Out: " + sb.toString());
        sb.append(System.lineSeparator());
        exchange.getIn().setBody(sb.toString());
    }

}

}

*** У меня проблема в том, что мне нужно, чтобы сплиттер завершил обработку sh до тех пор, пока он не достигнет всех пустых строк, или не пропустил или не остановил сплиттер на пустые записи. но мне нужно то, что было ранее разделено или обработано. Бросок и захват исключения останавливает сплиттер, я ничего не получаю. Я использую разделитель stoponexception, но, как говорится, он останавливается на исключении.

спасибо

Ответы [ 2 ]

1 голос
/ 19 февраля 2020

Итак, вы установили stopOnException = true и спросили, почему ваш маршрут остановился, когда исключение не было перехвачено =)? В качестве обходного пути забудьте о создании исключения и проверьте свое тело, а если оно содержит недопустимые данные, просто установите пустое тело, а затем суммируйте их в вашей AggregationStrategy, как в псевдо-маршруте ниже. Я не использовал описание xml очень долгое время, поэтому я надеюсь, что вы поймете этот пример с Java DSL.

public class ExampleRoute extends RouteBuilder {

AggregationStrategy aggregationStrategy = new AggregationStrategy() {
    @Override
    public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
        log.debug("Aggregation Strategy :: start");
        if (oldExchange != null) {
            newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + oldExchange.getIn().getBody(String.class));
        }
        log.debug("Aggregation Strategy :: finish");
        return newExchange;
    }
};

@Override
public void configure() throws Exception {
    from("{{fileEntranceEndpoint}}")
            .convertBodyTo(String.class)
            .split(tokenize("\n"), aggregationStrategy).streaming().stopOnException()
                .choice()
                .when(body().regex(",+\\$"))
                    .setBody(constant(""))
                .otherwise()
                    .process("toCsvFormat")
    ;
}

Я рекомендую вам использовать Java DSL. Как вы можете видеть, многие вещи просты в использовании.

0 голосов
/ 21 февраля 2020

Спасибо, C0ld. Ценить идущий легко. да, я понял иногда мы делаем глупости, почему другая пара глаз - замечательная вещь. Я принял ваше предложение, и оно работает как шарм. Большое спасибо за ответ.

        <split id="_split1"
            strategyRef="emptyRecordAggregationStrategy" streaming="true">
            <tokenize token="\n"/>
            <choice id="_choice5">
                <when id="_when5">
                    <simple>${body} regex '^,+$'</simple>
                    <setBody id="_setBody1">
                        <constant/>
                    </setBody>
                </when>
                <otherwise>
                  <process id="_processCSV" ref="toCsvFormat"/>
                </otherwise>
            </choice>
        </split>


public class EmptyRecordAggregationStrategy implements AggregationStrategy {

private Logger log = LoggerFactory.getLogger(EmptyRecordAggregationStrategy.class.getName());

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

    if ( newExchange.getException() != null ) {
          if ( oldExchange == null ) {  
            newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + System.lineSeparator());
            return newExchange;
          } else {
            oldExchange.getIn().setBody(oldExchange.getIn().getBody(String.class) + System.lineSeparator());
            return oldExchange;
            }
        }

        if ( oldExchange == null ) {
            newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + System.lineSeparator());
            return newExchange;
        }

        if ( !newExchange.getIn().getBody(String.class).isEmpty() ) {
          oldExchange.getIn().setBody(oldExchange.getIn().getBody(String.class) + newExchange.getIn().getBody(String.class) + System.lineSeparator());
        }
        return oldExchange;
}

}

...