У меня есть работа с многопоточным ориентированным на куски шагом , и мне нужно посчитать, сколько написанных элементов удовлетворяет некоторым бизнес-правилам. (PS: по старым причинам я использую Spring Batch 3.0.x)
Я должен помнить, что если произойдет откат, то предыдущие уже подсчитанные элементы в той же транзакции (то есть тот же кусок) должны быть проигнорированным Поэтому я не могу просто обновить JobExecutionContext прямо из Writer, я обновляю атрибут в ChunkContext и использую CustomChunkListener , чтобы обновлять JobExecutionContext только после успешного завершения фрагмента (как вы можете видеть в коде ниже).
Перед тем, как сделать шаг многопоточным, у меня была следующая реализация, которая работала как ожидалось (я максимально упростил код, чтобы сосредоточиться на проблеме):
CustomItemWriter
public class CustomItemWriter implements ItemWriter<String[]> {
private ChunkContext chunkContext;
@Override
public void write(List<? extends String[]> items) throws Exception {
for (String[] item : items) {
((AtomicLong)this.chunkContext.getAttribute("chunkCounter")).incrementAndGet();
}
}
@BeforeChunk
private void beforeChunk(ChunkContext chunkContext) {
this.chunkContext = chunkContext;
}
}
CustomChunkListener
public class CustomChunkListener extends ChunkListenerSupport {
@Override
public void beforeChunk(ChunkContext context) {
context.setAttribute("chunkCounter", new AtomicLong());
}
@Override
public void afterChunk(ChunkContext context) {
((AtomicLong)context.getStepContext().getJobExecutionContext().get("jobCounter")).addAndGet(((AtomicLong)context.getAttribute("chunkCounter")).get());
}
}
CustomJobListener
public class CustomJobListener extends JobExecutionListenerSupport {
@Override
public void beforeJob(JobExecution jobExecution) {
jobExecution.getExecutionContext().put("jobCounter", new AtomicLong());
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("jobCounter = " + jobExecution.getExecutionContext().get("jobCounter"));
}
}
Однако при настройке задания для запуска шаг в многопоточном режиме, счетчик не обновлялся должным образом, и я знаю, что это было из-за способа, которым я получал доступ к ChunkContext в CustomItemWriter.
bean-компонент CustomItemWriter имеет " step scope »(насколько я знаю,« chunk scope »не доступен), поэтому каждый раз, когда поток запускает новый ChunkContext, метод beforeChunk в CustomItemWriter перезаписывал предыдущий ChunkContext и все испортил (ранее подсчитанный затем пропал, так как я потерял ссылку на предыдущие экземпляры ChunkContext).
Итак, мне удалось исправить проблему с помощью ThreadLocal, как показано ниже :
CustomItemWriter (v2)
public class CustomItemWriter implements ItemWriter<String[]> {
private ThreadLocal<ChunkContext> chunkContext = new ThreadLocal<ChunkContext>();
@Override
public void write(List<? extends String[]> items) throws Exception {
for (String[] item : items) {
((AtomicLong)this.chunkContext.get().getAttribute("chunkCounter")).incrementAndGet();
}
}
@BeforeChunk
private void beforeChunk(ChunkContext chunkContext) {
this.chunkContext.set(chunkContext);
}
}
Хотя мне удалось решить проблему, мне интересно, есть ли лучший способ получить доступ к текущему ChunkContext (для текущего потока) из CustomItemWriter. Есть ли способ получить это программно? Чтобы сделать это «по-весеннему», возможно, в новых версиях Spring Batch должна быть реализована [новая] Chunk Scope?
PS: Кроме того, хотя проблема решена, я подумал, что было бы полезно написать этот вопрос, так что он может помочь кому-то с такими же потребностями.