Apache Camel, компонент sFTP и идемпотентный репозиторий - PullRequest
4 голосов
/ 03 августа 2020

Я хочу создать приложение в режиме высокой доступности (HA). Это означает, что у меня может быть изменяемое количество экземпляров, т.е. мне нужно иметь 5 экземпляров приложения. Приложение должно читать данные из ftp / sftp, избегая дублирования (один файл нельзя обработать 2 раза). Чтобы решить эту проблему, я решил использовать кластерные маршруты верблюдов в настройке active/active. В этой настройке используется репозиторий Idempotent.

Ниже приведена конфигурация моего репозитория Idempotent (я использую весеннюю загрузку, а также стартеры весенней загрузки, sql и ftp)

@Configuration
public class IdempotentRepoConf {

    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcMessageIdRepository sftpProcesorName() {
        return new JdbcMessageIdRepository(dataSource, "sftpProcesorName");
    }

}

И мои Маршрутизатор

@Component
public class FooSftpRouter extends RouteBuilder {

    @Autowired
    private IdempotentRepository idempotentRepository;

    @Override
    public void configure() throws Exception {
        from("sftp:localhost:2221/upload/files/foo?username=foo" +
                "&password=pass" +
                "&move=./.done" +
                "&moveFailed=.error" +
                "&idempotentRepository=#sftpProcesorName")
                .idempotentConsumer(header(Exchange.FILE_NAME),idempotentRepository)
                    .to("sftp:localhost:2221/upload/files/bar?username=foo&password=pass")
                .end();

    }
}

Когда я запускаю только один экземпляр, все работает без проблем, но когда я запускаю более одного экземпляра, у меня возникают проблемы с ошибкой

2020-08-03 15:14:14.589  WARN 18071 --- [pload/files/foo] o.a.c.c.file.remote.SftpConsumer         : Error processing file RemoteFile[Foo 4 ] due to Cannot retrieve file: upload/files/foo/Foo 4 . Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot retrieve file: upload/files/foo/Foo 4 ]

org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 4 
    at org.apache.camel.component.file.remote.SftpOperations.retrieveFileToStreamInBody(SftpOperations.java:778) ~[camel-ftp-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.remote.SftpOperations.retrieveFile(SftpOperations.java:717) ~[camel-ftp-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:434) ~[camel-file-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:145) ~[camel-ftp-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:234) ~[camel-file-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:196) ~[camel-file-3.2.0.jar:3.2.0]
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:187) ~[camel-support-3.2.0.jar:3.2.0]
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:106) ~[camel-support-3.2.0.jar:3.2.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.jcraft.jsch.SftpException: No such file
    at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2873) ~[jsch-0.1.55.jar:na]
    at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2225) ~[jsch-0.1.55.jar:na]
    at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1318) ~[jsch-0.1.55.jar:na]
    at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1290) ~[jsch-0.1.55.jar:na]
    at org.apache.camel.component.file.remote.SftpOperations.retrieveFileToStreamInBody(SftpOperations.java:759) ~[camel-ftp-3.2.0.jar:3.2.0]
    ... 13 common frames omitted

Чтобы получить это предупреждение, я выполните следующие действия

  1. Поместите несколько файлов в sftp в локализацию /home/foo/upload/files/foo (т.е.
for i in {1..10}; do  touch "Foo $i "; done;
Проверить базу данных (postgress)
select * from camel_messageprocessed;

и, как я ожидал, у меня есть 10 записей

  processorname   | messageid |        createdat        
------------------+-----------+-------------------------
 sftpProcesorName | Foo 1     | 2020-08-03 15:14:13.392
 sftpProcesorName | Foo 10    | 2020-08-03 15:14:13.607
 sftpProcesorName | Foo 9     | 2020-08-03 15:14:14.409
 sftpProcesorName | Foo 6     | 2020-08-03 15:14:14.419
 sftpProcesorName | Foo 8     | 2020-08-03 15:14:14.427
 sftpProcesorName | Foo 2     | 2020-08-03 15:14:14.435
 sftpProcesorName | Foo 3     | 2020-08-03 15:14:14.447
 sftpProcesorName | Foo 5     | 2020-08-03 15:14:14.455
 sftpProcesorName | Foo 4     | 2020-08-03 15:14:14.462
 sftpProcesorName | Foo 7     | 2020-08-03 15:14:14.469
(10 rows)

, но в журналах я вижу несколько раз предупреждения и ошибки

org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 3

и

Caused by: com.jcraft.jsch.SftpException: No such file

У моего build.gradle есть зависимость

    compile 'org.apache.camel.springboot:camel-ftp-starter:3.2.0'
    compile 'org.apache.camel.springboot:camel-sql-starter:3.2.0'

Я также пробовал с опцией readLock=idempotent, но эта опция в документации составляет (only for file component) и, вероятно, не работает в компоненте ftp / sftp

Итак, моя проблема, чтобы избежать дублирования и обрабатывать только один раз, все еще не решена.

что я делаю не так?

1 Ответ

2 голосов
/ 03 августа 2020

Очевидно, что ваши процессы конкурируют. Я не думаю, что это что-то делает

&idempotentRepository=#sftpProcesorName

, а idempotentConsumer() DSL работает только после SFTP.

Некоторые элементы можно попробовать из памяти :

  • readLock=fileLock # скорее всего не сработает
  • readlock=markerFile # может быть все, что вам нужно для «в основном нормально» работы
  • inProgressRepository=#jdbcRepository должно быть железо -крытый раствор.
...