Допустим, у нас есть 100K транзакций / записей в CSV-файле.
Мы должны обработать их в пакете из 1000 записей.
После обработки всех пакетов вызовите следующий маршрут или компонент, чтобы сохранить их в пакетах в БД.
В следующем коде класс csvUnmarshalledInstructionsProcessor проверяет записи CSV.И класс инструкцииProcessor сохраняет транзакцию в базе данных.
from(SPLIT_CSV_CORE_ENDPOINT)
.routeId(SPLIT_CSV_CORE_ROUTE)
.transacted()
.convertBodyTo(String.class, "utf-8")
.split().tokenize(RouteConstants.NEW_LINE_DELIMITER, tokenGroupSize)
.aggregationStrategy(new NPDAggregationStrategy())
.stopOnException().parallelProcessing().shareUnitOfWork().streaming()
.doTry()
.unmarshal(csv)
.bean(csvUnmarshalledInstructionsProcessor)
.bean(instructionProcessor)
.doCatch(Exception.class)
.bean(exceptionPropagator)
.end()
.end()
.end();
Нет проблем при возникновении ошибки в первом пакете.Когда система обнаруживает проблему в первом пакете, она выдает исключение и не переходит дальше.
Здесь проблема заключается в том, что если какой-либо пакет после первого пакета имеет какую-либо ошибку проверки, код сохраняется 1000записи из всех предыдущих действительных партий.Это делает данные несогласованными, так как мы отклоняем файл или полезную нагрузку, когда система видит недопустимую запись в пакете (что будет после первого пакета).Но мы также сохраняем транзакции или записи действительных пакетов в БД.
Итак, можем ли мы проверять записи сначала в пакетах, а после проверки всех записей по 100 КБ вызовет другой маршрут для сохранения их в пакетах снова?