Camel route переместить файлы из локальной папки в папку sftp, используя несколько потоков - PullRequest
0 голосов
/ 26 мая 2018

Итак, я перемещаю файлы из локальной папки в папку sftp.Файлы потенциально могут быть также порядка 500 000.У меня есть seda для процесса записи, когда несколько авторов пишут файлы, но перемещение файлов стало узким местом.Ниже приведен мой верблюжий маршрут:

from(props.splitFolderUri)
            .routeId('move-files')
            .log(LoggingLevel.INFO, 'file : ${exchange.getIn().headers[\'CamelFileName\']} :: moving this split file to sftp location ')
            .to(props.remoteFolderUri)
            .log(LoggingLevel.INFO, 'file : ${exchange.getIn().headers[\'CamelFileName\']} :: moved this split file to mailbox')

Я хочу сделать этот маршрут многопоточным, чтобы больше чем один поток продолжал перемещать файлы.Я попытался использовать потоки (10), как показано ниже

rom(props.splitFolderUri)
        .threads(10)
        .routeId('move-files')
        .log(LoggingLevel.INFO, 'file : ${exchange.getIn().headers[\'CamelFileName\']} :: moving this split file to sftp location ')
        .to(props.remoteFolderUri)
        .log(LoggingLevel.INFO, 'file : ${exchange.getIn().headers[\'CamelFileName\']} :: moved this split file to mailbox')

Но у меня произошла ошибка при переименовании файла.после некоторого исследования я обнаружил, что это сценарий, который происходит: Вот сценарий:

Thread#1 retrieves file1, moves to process folder
Thread#2 retrieves same file: file1 at the same time. file1 is deleted
Thread#2 cannot find file1 in source directory, rename fails.
Thread#1 fails due to deleted file by Thread#2

Таким образом, я попытался noop = true, и это работает, но это не хороший выбор для меня, так как файлы в контейнередолжно быть сохранено или у меня очень ограниченная память внутри контейнера для работы.Итак, что может быть хорошим решением относительно того же самого.Я новичок в верблюде, поэтому не могу точно указать, что может быть лучшим способом достижения этого

ОБНОВЛЕНИЕ: Я пытался даже следующим образом:

from(props.splitFolderUri)
   .to("seda:processSplitFiles")

from("seda:processSplitFiles?concurrentConsumers=10")
    .to(log...)
    .to(props.remoteFolderUri)
    ...

Но я продолжаю получать следующую ошибку:

    {"@timestamp":"2018-05-27T22:25:48.220-05:00","@version":1,"message":"Error during commit. Exchange[ID-8c85902bd64c-60121-1527477152921-0-90793]. Caused by:w [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot rename file: GenericFile[/apps/splitFiles/spl/stressTesting.txt_3843_sc_3843_t_10000] to: GenericFile[/apps/splitFiles/spl/.camel/stressTesting.txt_3843_sc_3843_t_10000]]","logger_name":"org.apache.camel.component.file.GenericFileOnCompletion","thread_name":"Camel (camel-1) thread #58 - seda://moveSplitFiles","level":"WARN","level_value":30000,"stack_trace":"org.apache.camel.component.file.GenericFileOperationFailedException: Cannot rename file: GenericFile[/apps/splitFiles/spl/stressTesting.txt_3843_sc_3843_t_10000] to: GenericFile[/apps/splitFiles/spl/.camel/stressTesting.txt_3843_sc_3843_t_10000]
 org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.renameFile(GenericFileProcessStrategySupport.java:115)
  org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.commit(GenericFileRenameProcessStrategy.java:88)
   org.apache.camel.component.file.GenericFileOnCompletion.processStrategyCommit(GenericFileOnCompletion.java:127)
    org.apache.camel.component.file.GenericFileOnCompletion.onCompletion(GenericFileOnCompletion.java:83) org.apache.camel.component.file.GenericFileOnCompletion.onComplete(GenericFileOnCompletion.java:57)
     org.apache.camel.util.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:104)
      org.apache.camel.impl.DefaultUnitOfWork.done(DefaultUnitOfWork.java:229)
       org.apache.camel.util.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:65)
        org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:654)
         org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:609)
          org.apache.camel.processor.CamelInternalProcessor$InternalCallback.done(CamelInternalProcessor.java:239)
           org.apache.camel.processor.Pipeline.process(Pipeline.java:109)
            org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)
             org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:298)
              org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:207)
               org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:154)
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                  java.lang.Thread.run(Thread.java:748)
                  ","HOSTNAME":"8c85902bd64c"}

Пожалуйста, дайте мне знать, что можно сделать для этого.

Ответы [ 2 ]

0 голосов
/ 28 мая 2018

Вы можете использовать верблюжий агрегатор для безопасной работы с потоками.Пожалуйста, найдите ниже ссылку для агрегатора:

http://camel.apache.org/aggregator.html

0 голосов
/ 27 мая 2018

(Отказ от ответственности: не эксперт, но учится ...)

У меня очень похожий сценарий, описанный с использованием одного потока, чтобы поднять файлы, и Seda-маршрута, чтобы фактически обрабатывать их параллельно.

Примерно так:

from(props.splitFolderUri)
    .to("seda:processSplitFiles");

from("seda:processSplitFiles?concurrentConsumers=10")
    .to(log...)
    .to(props.remoteFolderUri)
    ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...