Я запускаю 4 экземпляра приложений на основе Spring Boot Integration на 4 разных серверах.
Процесс такой:
- Чтение файлов XML по одному в общей папке.
- Обработка файла (проверка структуры, содержимого ...), преобразование данных и отправка электронной почты.
- Написать отчет об этом файле в другую общую папку.
- Удалить успешно обработанный файл.
Я ищу неблокирующее и безопасное решение для обработки этих файлов.
Варианты использования:
- Если во время чтения или обработки файла происходит сбой экземпляра (без завершения цепочки интеграции): другой экземпляр должен обработать файл, или тот же экземпляр должен обработать файл после перезапуска.
- Если экземпляр обрабатывает файл, остальные экземпляры не должны обрабатывать файл.
Я создал этот конфигурационный XML-файл Spring Integration (он включает в себя метаданные JDBC с общей базой данных H2):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
<int:queue/>
</int:channel>
<!-- Input -->
<int-file:inbound-channel-adapter
id="inputFilesAdapter"
channel="inputFilesChannel"
directory="file:${input.files.path}"
ignore-hidden="true"
comparator="lastModifiedFileComparator"
filter="compositeFilter">
<int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="1"/>
<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="${database.username}"/>
<property name="password" value="${database.password}"/>
<property name="maxIdle" value="4"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="jdbcMetadataStore"/>
<constructor-arg index="1" value="files"/>
</bean>
</list>
</constructor-arg>
</bean>
<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:outbound-channel-adapter
id="outputFilesChannel"
directory="file:${output.files.path}"
filename-generator-expression ="payload.name">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
</beans>
Проблема:
В случае нескольких файлов, когда один файл успешно обработан, транзакция фиксирует другие существующие файлы в метаданных хранилища (таблица INT_METADATA_STORE
). Поэтому, если приложение будет перезапущено, другие файлы никогда не будут обработаны
(работает нормально, если приложение падает при обработке первого файла).
Кажется, это применимо только для чтения файлов, а не для обработки файлов в цепочке интеграции ... Как управлять транзакцией отката для файла сбоя JVM по файлу?
Любая помощь очень ценится. Это сводит меня с ума: (
Спасибо!
Правки / Примечания:
По мотивам https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml
Я обновил свою конфигурацию с ответом от Артема Билана. И удалите блок transactional
в блоке poller
: у меня был конфликт транзакций между экземплярами (исключения из уродливой блокировки таблиц). Хотя поведение было таким же.
Я неудачно проверил эту конфигурацию в блоке poller
(то же поведение):
<int:advice-chain>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
Возможно, решение, основанное на Idempotent Receiver Enterprise Integration Pattern , может работать. Но мне не удалось настроить его ... Я не нахожу точной документации.