Spring Batch GridTaskPartition не работает для области разделителя = шаг - PullRequest
0 голосов
/ 21 ноября 2011

У меня есть следующий конфиг и код в весенней партии. Я получаю исключение.

<step id="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" next="limitDecision" >
<partition handler="partitionHandler" step="filestep" partitioner="filepartitioner" />
</step>

<bean id="partitionHandler"
class="sa.com.mobily.loader.partition.gridgain.GridGainPartitionHandler" />

<bean id="filepartitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner" scope="step" >
<property name="resources" value="#{dataMapprocessFiles}"/>
</bean>

код PartitionProvider

public class PartitionProvider {

private final StepExecutionSplitter stepSplitter;
private final StepExecution stepExecution;

public PartitionProvider(StepExecutionSplitter stepSplitter, StepExecution stepExecution) {
this.stepSplitter = stepSplitter;
this.stepExecution = stepExecution;
}

public String getStepName() {
return stepSplitter.getStepName();
}

public Set<StepExecution> getStepExecutions(int gridSize) throws JobExecutionException {
return stepSplitter.split(stepExecution, gridSize);
}

GridGainPartitionTask

public class GridGainPartitionTask extends GridTaskSplitAdapter<PartitionProvider, Collection<StepExecution>> {

@GridLoggerResource
private GridLogger log = null;

@Override
protected Collection<? extends GridJob> split(int gridSize, PartitionProvider stepSplit) throws GridException {

log.info("Executing steps for grid size=" + gridSize);

List<GridJob> jobs = new ArrayList<GridJob>(gridSize);

final String stepName = stepSplit.getStepName();

try {
for (final StepExecution stepExecution : stepSplit.getStepExecutions(gridSize)) {
jobs.add(new GridJobAdapterEx() {
public Serializable execute() {
RemoteStepExecutor stepExecutor = new RemoteStepExecutor("classpath:sa/com/mobily/loader/job/DataLoaderJob.xml", stepName, stepExecution);
log.info("Executing step '" + stepName + "' on this node.");
return stepExecutor.execute();
}
});
}
}
catch (JobExecutionException e) {
throw new GridException("Could not execute split step", e);
}

return jobs;
}

public Collection<StepExecution> reduce(List<GridJobResult> results) throws GridException {
Collection<StepExecution> total = new ArrayList<StepExecution>();
for (GridJobResult res : results) {
StepExecution status = res.getData();
total.add(status);
}
return total;
}

}

GridGainPartitionHandler

public class GridGainPartitionHandler extends TaskExecutorPartitionHandler {

@Autowired
@Qualifier("mscGridGain")
private Grid grid;

public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter, StepExecution stepExecution) throws Exception {
PartitionProvider partitionProvider = new PartitionProvider(stepSplitter, stepExecution);
GridTaskFuture<Collection<StepExecution>> future = grid.execute(GridGainPartitionTask.class, partitionProvider );
return future.get();
}

}

RemoteStepExecutor

public class RemoteStepExecutor implements Serializable {

private Log logger = LogFactory.getLog(getClass());

private final StepExecution stepExecution;

private final String stepName;

private final String configLocation;

public RemoteStepExecutor(String configLocation, String stepName, StepExecution stepExecution) {
this.configLocation = configLocation;
this.stepName = stepName;
this.stepExecution = stepExecution;
}

public StepExecution execute() {

Step step = (Step) new ClassPathXmlApplicationContext(configLocation).getBean(stepName, Step.class);

logger.info("Spring Version: " + SpringVersion.getVersion());

try {
step.execute(stepExecution);
}
catch (JobInterruptedException e) {
stepExecution.getJobExecution().setStatus(BatchStatus.STOPPING);
throw new UnexpectedJobExecutionException("TODO: this should result in a stop", e);
}

return stepExecution;

}

public String getStepName() {
return stepName;
}

}

Исключение

2011-11-21 09:56:40,087 458939 ERROR org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:212) Encountered an error executing the step

1 Ответ

1 голос
/ 22 ноября 2011

Похоже, вы пытаетесь использовать ваш partitionHandler вне контекста шага, а именно:

  • Вы создаете новый поток (ThreadPoolExecutor / Java concurrent).
  • В новой теме вы звоните org.gridgain.grid.util.worker.GridWorker#run()
  • GridWorker, и ваши GridGainPartitionTask
  • GridGainPartitionTask пытаются использовать partitionHandler из контекста Spring.

Это не работает.partitionHandler может быть создан только внутри «шагового» контекста Spring Batch.Правильная последовательность должна быть следующей:

  • Вы запускаете Spring Batch Job (через средство запуска заданий).
  • Spring Batch получает экземпляр задания из контекста, уже инициализированного с шагом (шагами).
  • Шаг лениво создаст экземпляр разделителя (об этом позаботится контекст Spring).Обратите внимание, что этот экземпляр действителен только для шага, например, экземпляр шага должен быть раньше в стеке вызовов.
  • Затем шаг выполняет разбиение и переносит раздел задания в сетку с помощью соответствующего PartitionHandler реализация.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...