Spring Integration - Filter - отправка сообщений в другую конечную точку - PullRequest
0 голосов
/ 25 сентября 2019

Мой входной файл представляет собой CSV-файл, как показано ниже:

USER_ID, USER_NAME, FREQUENCY, FREQUENCY_DETAIL
A123, AAA, ANNUALLY, 1-JUN
B123, BBB, INVALID_FREQUENCY, 21-JUN
C123, CCC, ANNUALLY, APR
D123, DDD, WEEKLY, 1-DEC

Проверки:

USER_ID -> alphanumeric
USERNAME -> alphabets only
FREQUENCY -> must be one of DAILY, WEEKLY, MONTHLY , ANNUALLY
FREQUENCY DETAIL -> Pattern \\d{1,2}-\\w{3}

Мой Бин выглядит следующим образом:

class UserBean {
    String userID;
    String userName;
    String frequency;
    String frequencyDetail;

    String status = "SUCCESS"; //Code generated. If any of the above fields is not valid, set as "ERROR, <field that failed>". E.g.: ERROR, FREQUENCY_DETAIL
}

Мой потоквыглядит следующим образом:

  1. Чтение CSV-файла из папки
  2. Преобразование в UserBean (с использованием openCsv)
  3. Если состояние какого-либо компонента содержит «ОШИБКА»,запись в отдельный канал с именем errorSummaryReportChannel.

Эти каналы записывают в файл userID + status.

4.Для всех bean-компонентов (с status = "SUCCESS" или status = "ERROR") преобразуйте в JSON и выведите в журнал.

ГДЕ НУЖНА ПОМОЩЬ:

Шаг 3

Если userBean имеет статус = "ОШИБКА", напишите в "errorSummaryReportChannel".Для всех состояний продолжить нормальный поток.

Я думаю, что необходимо добавить выходной канал и канал сброса, но не смог найти ни одного примера.

Мой код:

@Configuration
public class CreateUserConfiguration {
    @Bean
    public IntegrationFlow createUser() {
        return IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR)))
                .enrichHeaders(h -> h.header("errorChannel", "exceptionChannel", true))
                .transform(csvToUserBeanTransformer, "convertCsvToUserBean")
                .split(userBeanSplitter, "splitUserBeans")
                .wireTap(flow -> flow.<PrimeAOBean>filter(primeAOBean -> primeAOBean.getStatus().equalsIgnoreCase("SUCCESS"), errorFlow -> errorFlow.discardChannel("errorSummaryReportGenerationChannel")))
                .transform(userBeanToJSONTransformer, "convertUserBeanToJSON")
                .handle(Files.outboundAdapter(new File(OUTPUT_SUCCESS_DIRECTORY)))
                .get();
    }

    @Bean
    public IntegrationFlow logErrorSummary() {
        return IntegrationFlows.from("errorSummaryReportGenerationChannel")
                .handle((p,h) -> {
                    return ((UserBean)(p)).getUserID() + "\t" + ((UserBean)(p)).getStatus();
                })
                .transform(Transformers.objectToString())
                .handle(Files.outboundAdapter(new File(OUTPUT_FAILED_REPORT_FILE_NAME)))
                .get();
    }

@Bean
public IntegrationFlow logError() {
    return IntegrationFlows.from("exceptionChannel")
            .enrichHeaders(h -> h.headerExpression("errorFileName", "payload.failedMessage.headers.fileName"))
            .transform(Transformers.toJson())
            .handle(Files.outboundAdapter(new File(generateOutputDirectory(OUTPUT_FAILED_DIRECTORY))))
            .get();
}

}

Может кто-нибудь помочь, пожалуйста?

1 Ответ

0 голосов
/ 25 сентября 2019

Выходной канал для filter (или любой другой конечной точки) в потоке - это канал, автоматически созданный перед конечной точкой в ​​потоке.Итак, в вашем случае это выглядит так:

 .filter()
 .transform()

Всякий раз, когда сообщение проходит фильтр, оно отправляется в этот преобразователь.Мы можем рассматривать его как SUCCESS процесс.

Теперь давайте напишем функцию фильтра, прежде всего, чтобы эти сообщения работали правильно!

Для вас есть такой метод:

<code>/**
 * Populate a {@link MessageFilter} with {@link MethodInvokingSelector}
 * for the provided {@link GenericSelector}.
 * Typically used with a Java 8 Lambda expression:
 * <pre class="code">
 * {@code
 *  .filter("World"::equals)
 * }
 * 
* Используйте {@link #filter (Class, GenericSelector)}, если вам нужен доступ ко всему * сообщению.* @param genericSelector {@link GenericSelector} для использования.* @param

исходный тип полезной нагрузки.* @ возврат текущего {@link IntegrationFlowDefinition}.* / public

B фильтр (GenericSelector

genericSelector) {

Итак, наша задача - реализовать это GenericSelector для вашего UserBean:

 .<UserBean>filter(userBean -> userBean.getStatus().equalsIgnoreCase("SUCCESS"))

Чтобы отправить все остальные (ERROR) сообщения на канал сброса (errorSummaryReportGenerationChannel), нам нужно как-то добавить эту опцию discardChannel в фильтр.И для этой цели в Java DSL есть перегруженный filter() метод:

.<UserBean>filter(userBean -> userBean.getStatus().equalsIgnoreCase("SUCCESS"),
               endpoint -> endpoint.discardChannel("errorSummaryReportGenerationChannel"))

См. Их JavaDocs для получения дополнительной информации.

...