Дождитесь завершения работы адаптера исходящего канала jdbc перед дальнейшей обработкой - PullRequest
0 голосов
/ 24 апреля 2018

Я новичок в Spring Integration и экспериментирую с различными компонентами в небольшом проекте.

В данной задаче мне нужно обработать текстовый файл и сохранить его содержимое в базе данных.Файл содержит строки, которые можно сгруппировать, поэтому будет естественным разделить каждый файл на несколько независимых сообщений.

Это весь процесс (см. В конце конфигурацию):

  1. сделать первоначальный анализ файла;
    • сделано transformers.outcomeTransf
  2. сохранить некоторые данные в базе данных (например, имя файла, дата файла и т. Д.);
    • ?
  3. разбить содержимое файла на несколько отдельных сообщений;
    • сделано splitters.outcomeSplit
  4. дальнейший анализ каждого сообщения;
    • сделано transformers.SingleoutcomeToMap
  5. сохранить данные одного сообщения в базе данных, ссылаясь на данные, сохраненные на шаге 1.
    • выполнено stored-proc-outbound-channel-adapter

База данных содержит только две таблицы:

  • T1 для метаданных файла (имя файла, дата файла, источник файла, ...);
  • T2 для деталей содержимого файла, здесь строки ссылаются на строки в T1.

Мне не хватает компонента для шага 2. Насколько я понимаю, исходящий адаптер канала «проглатывает» это сообщение.обрабатывает, так что никакая другая конечная точка не может его получить.

Я думал о канале публикации-подписки (без TaskExecutor) после первого шага с исходящим адаптером jdbc как первый подписчик, а сплиттер из ствола 3 - как второй: каждый подписанный обработчик должен затем получить копию сообщения, но мне неясно, будет ли какая-либо обработка в разделителе ждать завершения исходящего адаптера.

Это правильный подход к задаче?Что если преобразователь на шаге 4 вызывается асинхронно - каждое разделенное сообщение является автономным, и это потребует параллелизма.


Spring-конфигурация:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file
            http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jdbc
            http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- input-files-from-folder -->
    <int-file:inbound-channel-adapter id="outcomeIn" 
                                  directory="file:/in-outcome">
        <int:poller id="poller" fixed-delay="2500" />
    </int-file:inbound-channel-adapter>

    <int:transformer input-channel="outcomeIn" output-channel="outcomesChannel" method="transform">
        <beans:bean class="transformers.outcomeTransf" />
    </int:transformer>

    <!--  save source to db! -->

    <int:splitter input-channel="outcomesChannel" output-channel="singleoutcomeChannel" method="splitMessage">
        <beans:bean class="splitters.outcomeSplit" />
    </int:splitter>

    <int:transformer input-channel="singleoutcomeChannel" output-channel="jdbcChannel" method="transform">
        <beans:bean class="transformers.SingleoutcomeToMap" />
    </int:transformer>

    <int-jdbc:stored-proc-outbound-channel-adapter   
        data-source="dataSource" channel="jdbcChannel" stored-procedure-name="insert_outcome" 
        ignore-column-meta-data="true">

        <int-jdbc:sql-parameter-definitions ... />

        <int-jdbc:parameter ... />

    </int-jdbc:stored-proc-outbound-channel-adapter>

    <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close" >
        <property name="driverClassName" value="org.postgresql.Driver"/>
        <property ... />
    </bean>

</beans>

1 Ответ

0 голосов
/ 24 апреля 2018

Вы думаете, что правильно. Когда у вас есть PublishSubscribeChannel без Executor, каждый следующий подписчик будет ждать, когда предыдущий закончит свою работу. Поэтому ваш spllitter не будет вызываться, пока все не будет сделано на БД. Более того, по умолчанию, когда первый подписчик не может обработать сообщение (не соединение с БД?), Все остальные не будут вызываться.

Другой способ достижения аналогичного поведения можно настроить с помощью <request-handler-advice-chain> и ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice

Весь параллелизм и многопоточность нисходящего потока сплиттера уже не связаны с логикой БД. Параллелизм не произойдет, пока БД не выполнит свой запрос должным образом.

...