Верблюд: обрабатывать файлы, разбивать и объединять - PullRequest
0 голосов
/ 26 мая 2019

У меня есть ситуация, когда мне нужно читать из каталога и обрабатывать все XML-файлы там. Затем файлы обрабатываются и разделяются по некоторому правилу, что приводит к появлению N новых сообщений / файлов. Я хотел бы объединить все новые документы в индексный файл с одной строкой на файл. Как только пакет будет обработан, я позвоню в веб-службу и сообщу ему, чтобы он загрузил индексный файл.

Проблема, с которой я сталкиваюсь, заключается в том, что Camel не видит доставку файлов как одно задание, и моя агрегация вызывается несколько раз, в результате чего вместо одного файла получается несколько индексных файлов. Я попытался использовать свойство Exchange.BATCH_SIZE и умножить его на Exchange.SPLIT_SIZE для агрегации completionSize

Псевдо-код:

          from("file://" + SOURCE_FOLDER)
             .threads(10)
             .convertBodyTo(Data.class)
             .process(myProcessor)
             .split(xpath(MAIN_NODE))
             .parallelProcessing()
             .to(MyRouter.ENDPOINT)
             .setProperty(TOTAL)
               .spel(String.format("%s * %s", Exchange.SPLIT_SIZE, FileRouter.NUM_FILES))
             .aggregate(constant(true), myAggregator)
....

Итак, вопрос: как я могу определить границы обмена / группировки? Я знаю , что будет доставляться только 1 файл за раз, но как я могу сказать, что Верблюд?

Я пытался работать только с 1 потоком, но мне кажется, что это не влияет на этот вопрос.

У меня есть возможность доставки в виде tar.gz - это будет проще? Мне кажется, у меня была бы та же проблема.

 from("file://" + SOURCE_FOLDER + "/tar.gz?consumer.delay=1000&noop=true")
                .streamCaching()
                .unmarshal()
                .gzip()
                .split(new TarSplitter())
                ....<SAME ISSUE>

Я использую Camel 3 Preview, хотя я также пытался использовать последнюю версию 2.x.

Спасибо заранее и наилучшим образом,

1 Ответ

0 голосов
/ 27 мая 2019

Когда я вас правильно понимаю, ваша проблема в том, что вы не можете создать агрегированный индекс для каждого входного файла .

Ну, верблюд Splitter EIP можно комбинировать с стратегией агрегирования .

Если вы сделаете это, Splitter повторно объединит все части ранее разделенного сообщения в новый агрегат .

// a Splitter with an AggregationStrategy
.split([yourSplitCriteria], new MyAggregationStrategy())
    // you can do whatever you want in here with the message parts
    // for example each splitted message part is sent to this bean 
    .to("bean:PartProcessor")
    // end the split to re-aggregate the message parts
.end()
// here, after the split-ending you get the re-aggregated messages
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...