Spring Integration: повторная настройка с несколькими экземплярами - PullRequest
0 голосов
/ 12 ноября 2018

Я запускаю 4 экземпляра приложений на основе Spring Boot Integration на 4 разных серверах. Процесс такой:

  1. Чтение файлов XML по одному в общей папке.
  2. Обработка файла (проверка структуры, содержимого ...), преобразование данных и отправка электронной почты.
  3. Написать отчет об этом файле в другую общую папку.
  4. Удалить успешно обработанный файл.

Я ищу неблокирующее и безопасное решение для обработки этих файлов.

Варианты использования:

  • Если во время чтения или обработки файла происходит сбой экземпляра (без завершения цепочки интеграции): другой экземпляр должен обработать файл, или тот же экземпляр должен обработать файл после перезапуска.
  • Если экземпляр обрабатывает файл, остальные экземпляры не должны обрабатывать файл.

Я создал этот конфигурационный XML-файл Spring Integration (он включает в себя метаданные JDBC с общей базой данных H2):

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
    <int:queue/>
</int:channel>

<!-- Input -->
<int-file:inbound-channel-adapter 
    id="inputFilesAdapter"
    channel="inputFilesChannel"
    directory="file:${input.files.path}" 
    ignore-hidden="true"
    comparator="lastModifiedFileComparator"
    filter="compositeFilter">
   <int:poller fixed-rate="10000" max-messages-per-poll="1"  task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="1"/>

<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
    <property name="driverClassName" value="org.h2.Driver"/>
    <property name="username" value="${database.username}"/>
    <property name="password" value="${database.password}"/>
    <property name="maxIdle" value="4"/>
</bean>

<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
    <constructor-arg ref="jdbcDataSource"/>
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="jdbcDataSource"/>
</bean>

<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
                <constructor-arg index="0" ref="jdbcMetadataStore"/>
                <constructor-arg index="1" value="files"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
    <int:service-activator ref="fileActivator" method="fileRead"/>
    <int:service-activator ref="fileActivator" method="fileProcess"/>
    <int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>

<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>

<int-file:outbound-channel-adapter 
    id="outputFilesChannel" 
    directory="file:${output.files.path}"
    filename-generator-expression ="payload.name">
    <int-file:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
             <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
        </bean>
    </int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>

</beans>

Проблема: В случае нескольких файлов, когда один файл успешно обработан, транзакция фиксирует другие существующие файлы в метаданных хранилища (таблица INT_METADATA_STORE). Поэтому, если приложение будет перезапущено, другие файлы никогда не будут обработаны (работает нормально, если приложение падает при обработке первого файла). Кажется, это применимо только для чтения файлов, а не для обработки файлов в цепочке интеграции ... Как управлять транзакцией отката для файла сбоя JVM по файлу?

Любая помощь очень ценится. Это сводит меня с ума: (

Спасибо!

Правки / Примечания:

  • По мотивам https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml

  • Я обновил свою конфигурацию с ответом от Артема Билана. И удалите блок transactional в блоке poller: у меня был конфликт транзакций между экземплярами (исключения из уродливой блокировки таблиц). Хотя поведение было таким же.

  • Я неудачно проверил эту конфигурацию в блоке poller (то же поведение):

    <int:advice-chain>
        <tx:advice id="txAdvice" transaction-manager="transactionManager">
            <tx:attributes>
                <tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
    
  • Возможно, решение, основанное на Idempotent Receiver Enterprise Integration Pattern , может работать. Но мне не удалось настроить его ... Я не нахожу точной документации.

Ответы [ 2 ]

0 голосов
/ 15 ноября 2018

Хорошо. Я нашел рабочее решение. Может быть, не самый чистый, но он работает:

  • Несколько экземпляров на отдельных серверах, совместно использующих одну и ту же базу данных H2 (подключение к сетевой папке). Я думаю, что это должно работать через удаленный TCP. MVCC был активирован на H2 (проверьте его doc ).
  • inbound-channel-adapter имеет активированную опцию scan-each-poll, позволяющую перезаписывать файлы, которые ранее можно было игнорировать (если процесс уже запущен другим экземпляром). Таким образом, в случае сбоя другого экземпляра файл может быть снова опрошен и обработан без перезапуска для этого самого экземпляра.
  • Опция defaultAutoCommit установлена ​​в false в БД.
  • Я не использовал FileSystemPersistentAcceptOnceFileListFilter, поскольку он агрегировал все прочитанные файлы в метаданных хранилища, когда один файл был успешно обработан. Мне не удалось использовать это в моем контексте ...
  • Я написал свои собственные условия и действия в выражениях через фильтр и синхронизацию транзакций.

    <!-- Input -->
    <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
    <int-file:inbound-channel-adapter
        id="inputAdapter"
        channel="inputChannel"
        directory="file:${input.files.path}"
        comparator="lastModifiedFileComparator"
        scan-each-poll="true">
       <int:poller max-messages-per-poll="1" fixed-rate="5000">
            <int:transactional transaction-manager="transactionManager"  isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
       </int:poller>
    </int-file:inbound-channel-adapter>
    
    <!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
    <int:filter input-channel="inputChannel" output-channel="processChannel"  discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>
    
    <!-- Rollback by removing the file from the metadatastore -->
    <int:transaction-synchronization-factory id="syncFactory">
        <int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
    </int:transaction-synchronization-factory>
    
    <!-- Metadatastore configuration -->
    <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
        <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
        <property name="driverClassName" value="org.h2.Driver"/>
        <property name="username" value="${database.username}"/>
        <property name="password" value="${database.password}"/>
        <property name="maxIdle" value="4"/>
        <property name="defaultAutoCommit" value="false"/>
    </bean>
    
    <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
        <constructor-arg ref="jdbcDataSource"/>
    </bean>
    
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="jdbcDataSource"/>
    </bean>
    
    <!-- Workflow -->
    <int:chain input-channel="processChannel" output-channel="outputChannel">
        <int:service-activator ref="fileActivator" method="fileRead"/>
        <int:service-activator ref="fileActivator" method="fileProcess"/>
        <int:service-activator ref="fileActivator" method="fileAudit"/>
    </int:chain>
    
    
    <!-- Output -->
    <int-file:outbound-channel-adapter 
        id="outputChannel" 
        directory="file:${output.files.path}"
        filename-generator-expression ="payload.name">
        <!-- Delete the source file -->
        <int-file:request-handler-advice-chain>
            <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                 <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
            </bean>
        </int-file:request-handler-advice-chain>
    </int-file:outbound-channel-adapter>
    

Любые улучшения или другие решения приветствуются.

0 голосов
/ 12 ноября 2018

Вы не должны использовать PseudoTransactionManager, но DataSourceTransactionManager вместо.

Так как вы используете JdbcMetadataStore, он будет участвовать в транзакции, и в случае сбоя нисходящего потока запись в хранилище метаданных также будет откатана.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...