Переместить файл из входящего адаптера после публикации потока подписки - PullRequest
1 голос
/ 08 мая 2019

Я пытаюсь реализовать следующий поток: 1) файлы читаются с входящего адаптера 2) они отправляются в разные потоки по каналу публикации-подписки с примененной последовательностью 3) файл перемещается после того, как все потоки подписчика готовы

Это основной поток

return IntegrationFlows
                .from(Files.inboundAdapter(inboundOutDirectory)
                           .regexFilter(pattern)
                           .useWatchService(true)
                           .watchEvents(FileReadingMessageSource.WatchEventType.CREATE),
                        e -> e.poller(Pollers.fixedDelay(period)
                                             .taskExecutor(Executors.newFixedThreadPool(poolSize))
                                             .maxMessagesPerPoll(maxMessagesPerPoll)))
                .publishSubscribeChannel(s -> s
                        .applySequence(true)
                        .subscribe(f -> f
                                .transform(Files.toStringTransformer())
                                .<String>handle((p, h) -> {
                                       return "something"
                                    }
                                })                                
                                .channel("consolidateFlow.input"))
                        .subscribe(f -> f
                                .transform(Files.toStringTransformer())
                                .handle(Http.outboundGateway(testUri)
                                            .httpMethod(HttpMethod.GET)
                                            .uriVariable("text", "payload")                                            .expectedResponseType(String.class))
                                .<String>handle((p, h) -> {
                                    return "something";
                                })
                                .channel("consolidateFlow.input")))
                .get();

И агрегация:

public IntegrationFlow consolidateFlow()
return flow -> flow
                .aggregate()
                .<List<String>>handle((p, h) -> "something").log()
    }
}

Использование следующего кода в основном потоке после публикации-подписки

.handle(Files.outboundGateway(this.inboundProcessedDirectory).deleteSourceFiles(true))

заканчивается

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

Если я пойду с этим, поток консолидации / агрегации вообще не будет достигнут.

.handle(Files.outboundAdapter(this.inboundProcessedDirectory))

Есть идеи, как мне это решить? В настоящее время я перемещаю файл после агрегации, считывая исходное имя файла из заголовка, но это не кажется правильным решением. Я также думал о применении спецификации / совета к входящему адаптеру с логикой success для перемещения файла, но не уверен, что это правильный подход.

EDIT1 По предложению Артема, я добавил еще одного подписчика в публикацию-подписку следующим образом:

...
.channel("consolidateNlpFlow.input"))
                        .subscribe(f -> f
                                .handle(Files.outboundAdapter(this.inboundProcessedDirectory).deleteSourceFiles(true))
...

Файлы перемещены правильно, но ConsolidateFlow не выполняется вообще. Любая идея? Я также попытался добавить канал в новый поток .channel("consolidateNlpFlow.input"), но это не изменило его поведение.

1 Ответ

2 голосов
/ 08 мая 2019

Ваша проблема в том, что consolidateFlow не может вернуть результат в основной поток. Просто потому, что есть что-то похожее на шлюз. Вы делаете явное .channel("consolidateFlow.input"), что означает, что пути назад не будет. Это вопрос, который у вас есть.

Относительно возможного решения.

Согласно вашей конфигурации оба ваших подписчика в publishSubscribeChannel выполняются в одном потоке, один за другим. Таким образом, вам будет очень легко добавить еще одного подписчика с этими Files.outboundAdapter() и deleteSourceFiles(true). Этот будет вызываться уже после существующих подписчиков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...