Прежде всего, этот вопрос кажется почти в точности тем, что мы хотим сделать: Параллельное пошаговое выполнение ItemStreamReader в SpringBatch
Пакет Spring 3.0.10, работающий под управлением Java EE-сервер (WebSphere, Java 8, Java EE 6). Я также не являюсь первоначальным автором этого кода.
XML конфигурация, у нас есть пакетный шаг с чтением строки из файла, обработка строки (часть, которую мы хотим многопоточность ) и запись записи в базу данных:
<batch:step id="processRenewalsStep" next="saveResponseFileStep">
<batch:tasklet task-executor="taskExecutor" throttle-limit="4">
<batch:chunk
reader="batchRenewalCsvFileItemReader"
writer="asyncBatchRenewalDb2ItemWriter"
processor="asyncBatchRenewalProcessor"
commit-interval="1"
skip-limit="10">
<batch:skippable-exception-classes>
<batch:include class="java.lang.Exception"/>
</batch:skippable-exception-classes>
<batch:listeners>
<batch:listener ref="batchSkipListener"/>
</batch:listeners>
</batch:chunk>
</batch:tasklet>
</batch:step>
Даже после выполнения предложенной синхронной упаковки в принятом ответе :
<bean id="asyncBatchRenewalProcessor" class="org.springframework.batch.integration.async.AsyncItemProcessor"
p:taskExecutor-ref="taskExecutor"
p:delegate-ref="batchRenewalProcessor"/>
<bean id="asyncBatchRenewalDb2ItemWriter" class="org.springframework.batch.integration.async.AsyncItemWriter"
p:delegate-ref="batchRenewalDb2ItemWriter"/>
Наш запуск log по-прежнему содержит:
22: 25: 25,252 (по умолчанию: 3) WARN org.springframework.batch.core.step.builder.FaultTolerantStepBuilder: с помощью средства чтения ItemStream обнаружен асинхронный TaskExecutor. Вероятно, это ошибка и может привести к сохранению неверных данных перезапуска.
И
22: 34: 03,755 (WorkManager.DefaultWorkManager: 0) WARN org. springframework.batch.core.step.item.ChunkMonitor: не установлен ItemReader (должен быть параллельный шаг), поэтому данные смещения игнорируются.
И несколько экземпляров:
22: 34: 03,880 (WorkManager.DefaultWorkManager: 2) WARN org.springframework.batch.core.step.item.ChunkMonitor: ItemStream был открыт в другом потоке. Данные перезапуска могут быть скомпрометированы.
Что, я полагаю, имеет некоторый смысл, потому что мы обернули Процессор и Writer, но нет класса, чтобы обернуть Reader?
Выполняю мою работу, ведение журнала указывает на то, что используются разные потоки из моего контейнера Executor, но я не пробовал этого до использования асинхронных оберток, так что, возможно, он «работал бы» даже без этого?
Так есть ли какой-нибудь способ для предотвращения ПРЕДУПРЕЖДЕНИЙ и условий, о которых они предупреждают?
Обновление : на основе этого в документации :
Если читатель не является потокобезопасным, его все же может быть эффективным использовать в вашем собственном синхронизирующем делегаторе. Вы можете синхронизировать вызов read (), и до тех пор, пока обработка и запись являются наиболее затратной частью фрагмента, ваш шаг может выполняться намного быстрее, чем в однопоточной конфигурации.
I ' Мы пробовали создать подкласс FlatFileItemReader
, который синхронизирует свой собственный метод read()
, вызывающий базовый метод read()
, но я все еще получаю сообщение WARN. Я не знаю, правильно ли я понял этот совет или то, что я сделал сейчас, безопасно, но не может быть определено как безопасное, поэтому оно все еще регистрируется.