Вот как я окончательно спроектировал решение:
Итак, я перестроил весь поток от подхода, основанного на тасклетах, до подхода, основанного на оркестрированном чанке.
Задание будет иметь 1 шаг named - fetchProcessAndWriteData.
jobBuilderFactory.get("allChunkJob")
.start(step("fetchProcessAndWriteData"))
.next(step("updatePostJobRunDetails"))
.build()
fetchProcessAndWriteData: будет иметь ридер, masterProcessor и masterWriter с размером фрагмента 10000.
steps
.get("fetchProcessAndWriteData")
.chunk(BATCHSIZE)
.reader(chunkReader)
.processecor(masterProcessor)
.writer(masterWriter)
.listener(listener())
.build()
chunkReader - считывать данные в фрагменты из курсора базы данных и передать их masterProcessor.
masterProcessor - принимает данные по одной и передает записи всем другим процессорам - P1, P2, P3, P4 и сохраняет обработанные данные в CompositeResultBean.
CompositeResultBean состоит из держателей данных для всех 4 типов записей.
List<Record> recordType1.
List<Record> recordType2.
List<Record> recordType3.
List<Record> recordType4.
Этот компонент затем возвращается из метода процесса masterProcessor.
public Object process(Object item){
..
bean.setRecordType1(P1.process(item));
bean.setRecordType2(P2.process(item));
bean.setRecordType3(P3.process(item));
bean.setRecordType4(P4.process(item));
return bean;
}
masterWriter - на этом шаге принимается список записей, т. Е. Список составных элементов здесь. Выполните итерацию в списке bean-компонента и вызовите соответствующий метод записи W1, W2, W3, W4 writer () с данными, содержащимися в каждом из композитных атрибутов композитного текста.
publi c void write (List list) {
list.forEach(record -> {
W1.write(isInitialBatch,list.getRecordType1());
W2.write(isInitialBatch,list.getRecordType2());
W3.write(isInitialBatch,list.getRecordType3());
W4.write(isInitialBatch,list.getRecordType4());
});
}
Все эти шаги выполняются в виде пакета из 10 тыс. Записей и записывают данные в файл.
Еще одна проблема, с которой я столкнулся при записи файла, заключалась в том, что мне пришлось бы заменить Уже существующий файл в первый раз записи записывается, но для более поздних необходимо добавить в тот же файл. Я решил эту проблему, переопределив chunkListener в masterWriter - где я извлек пакет # и установил флаг stati c isInitialBatch , по умолчанию равный TRUE. Эта переменная устанавливается внутри
beforeChunk()
if chunkContext.getStepContext().getStepExecution().getCommitCount()==0 as TRUE , else FALSE .
То же логическое значение передается в FileWriter, который открывает файл в режиме добавления - TRUE или FALSE.
W1.write(isInitialBatch,list.getRecordType1());