Я пытаюсь реализовать следующий поток:
1) файлы читаются с входящего адаптера
2) они отправляются в разные потоки по каналу публикации-подписки с примененной последовательностью
3) файл перемещается после того, как все потоки подписчика готовы
Это основной поток
return IntegrationFlows
.from(Files.inboundAdapter(inboundOutDirectory)
.regexFilter(pattern)
.useWatchService(true)
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE),
e -> e.poller(Pollers.fixedDelay(period)
.taskExecutor(Executors.newFixedThreadPool(poolSize))
.maxMessagesPerPoll(maxMessagesPerPoll)))
.publishSubscribeChannel(s -> s
.applySequence(true)
.subscribe(f -> f
.transform(Files.toStringTransformer())
.<String>handle((p, h) -> {
return "something"
}
})
.channel("consolidateFlow.input"))
.subscribe(f -> f
.transform(Files.toStringTransformer())
.handle(Http.outboundGateway(testUri)
.httpMethod(HttpMethod.GET)
.uriVariable("text", "payload") .expectedResponseType(String.class))
.<String>handle((p, h) -> {
return "something";
})
.channel("consolidateFlow.input")))
.get();
И агрегация:
public IntegrationFlow consolidateFlow()
return flow -> flow
.aggregate()
.<List<String>>handle((p, h) -> "something").log()
}
}
Использование следующего кода в основном потоке после публикации-подписки
.handle(Files.outboundGateway(this.inboundProcessedDirectory).deleteSourceFiles(true))
заканчивается
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
Если я пойду с этим, поток консолидации / агрегации вообще не будет достигнут.
.handle(Files.outboundAdapter(this.inboundProcessedDirectory))
Есть идеи, как мне это решить? В настоящее время я перемещаю файл после агрегации, считывая исходное имя файла из заголовка, но это не кажется правильным решением.
Я также думал о применении спецификации / совета к входящему адаптеру с логикой success для перемещения файла, но не уверен, что это правильный подход.
EDIT1
По предложению Артема, я добавил еще одного подписчика в публикацию-подписку следующим образом:
...
.channel("consolidateNlpFlow.input"))
.subscribe(f -> f
.handle(Files.outboundAdapter(this.inboundProcessedDirectory).deleteSourceFiles(true))
...
Файлы перемещены правильно, но ConsolidateFlow не выполняется вообще. Любая идея?
Я также попытался добавить канал в новый поток .channel("consolidateNlpFlow.input")
, но это не изменило его поведение.