Условное перемещение файлов в Spring Integration File Support - PullRequest
1 голос
/ 31 марта 2020

Я реализую один рабочий поток Spring Integration, как показано ниже.

IntegrationFlows.from("inputFileProcessorChannel")
                           .split(fileSplitterSpec, spec -> {})
                           .transform(lineItemTransformer)
                           .handle(httpRequestExecutingMessageHandler)
                           .transform(reportDataAggregator)
                           .aggregate(aggregatorSpec -> aggregatorSpec.requiresReply(false))
                           .channel("reportGeneratorChannel")
                           .get();

Теперь, когда вышеуказанный поток завершен, мне нужно переместить input file в каталог архива. Решение о выборе каталога назначения основано на заголовке сообщения processingFailed, и этот заголовок добавляется на шаге .transform(reportDataAggregator) в потоке. Чтобы переместить эти файлы, я создал еще один поток, как показано ниже: код

 IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
                           .routeToRecipients(routerSpec -> {
                               routerSpec.recipient("processedFileMoverChannel", createMessageSelector(Boolean.FALSE))
                                         .recipient("failedFileMoverChannel", createMessageSelector(Boolean.TRUE));
                           })

                           .get();  

Метод выбора

 private MessageSelector createMessageSelector(Boolean ruleBoolean) {
    return message -> ruleBoolean.equals(message.getHeaders().get("processingFailed"));
}

Поток канала отчета ниже

IntegrationFlows.from("reportGeneratorChannel")
                           .transform(executionReportTransformer)
                           .handle(reportWritingMessageHandlerSpec)
                           .get();

Но, как и ожидалось с этим потоком перемещение файла не выполняется, поскольку указанный заголовок не присутствует в выполнении потока.

Итак, как достичь этой цели, чтобы выполнить file mover flow после создания файла отчета?

1 Ответ

2 голосов
/ 31 марта 2020

FileSplitter заполняет для нас следующие заголовки для каждой строки:

@Override
protected boolean willAddHeaders(Message<?> message) {
    Object payload = message.getPayload();
    return payload instanceof File || payload instanceof String;
}

@Override
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
    File file = null;
    if (message.getPayload() instanceof File) {
        file = (File) message.getPayload();
    }
    else if (message.getPayload() instanceof String) {
        file = new File((String) message.getPayload());
    }
    if (file != null) {
        if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
            headers.put(FileHeaders.ORIGINAL_FILE, file);
        }
        if (!headers.containsKey(FileHeaders.FILENAME)) {
            headers.put(FileHeaders.FILENAME, file.getName());
        }
    }
}

Итак, даже если вы закончили с агрегацией и готовы отправить сообщение в этот .channel("reportGeneratorChannel"), вы все равно иметь доступ к этим заголовкам, связанным с файлами.

Создание этого reportGeneratorChannel в виде PublishSubscribeChannel и перемещение этого «потока файлового движителя» туда помогут вам.

Кстати: то, что у вас есть до сих пор с IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel")) и вторым потоком на том же канале, приведет вас к циклической диспетчеризации. Это не паб-суб-распространение. Больше информации в документах: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...