Мы используем разделение Spring Batch для параллельной обработки нескольких входных файлов на двух JVM. В одной JVM работает один ведущий и один ведомый, а в другой JVM работает другой ведомый.
Запуск главного и подчиненного в первой JVM выполняется путем запуска загрузочного приложения Spring с передачей имени задания, как и при запуске любого другого пакетного задания.
Мы запускаем ведомое устройство на второй JVM, запуская загрузочное приложение Spring с именем dummyjob. У ведомого устройства нет конфигурации задания, у него есть только входящий поток для получения сообщения, stepExecutionRequestHandler и кода шага.
Результат: Все подчиненные бины успешно инициализированы подчиненный потребитель получает сообщение и запускает stepExecutionRequestHandler, который не может создать соединение с БД без какой-либо ошибки. Если я добавлю конфигурацию задания в ведомое устройство и начну задание, передавая правильное имя задания, проблема не возникнет, что заставляет меня думать, что проблема может быть связана с незапуском реального задания Spring Batch, которое должно инициализировать некоторые необходимые Ресурсы. Я проверил, что компоненты datasourceConfiguration и datasource были инициализированы, что делается как часть отдельного модуля.
Поэтому мне интересно, правильно ли я запускаю рабов или есть лучший способ их запустить.
Вот конфигурация раба:
/*
* Configure inbound flow (requests coming from the master)
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
public StepLocator stepLocator() {
BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
beanFactoryStepLocator.setBeanFactory(beanFactory);
return beanFactoryStepLocator;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public MessageChannel inboundRequests() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("inboundRequests") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(this.requestQueue);
container.setPrefetchCount(1);
return container;
}
combineReleaseJobNormalStep CODE ...
Вот конфигурация мастера:
@Bean
public Job combineReleaseJob() throws Exception {
return jobBuilderFactory.get("CombineReleaseJob")
.incrementer(new RunIdIncrementer())
.listener(resourceLoader)
.listener(combineReleaseJobJobContextPreparer())
.flow(combineReleaseJobCL31401())
.from(combineReleaseJobCL31401()).on("N").to(combineReleaseJobNormalStepManager())
.from(combineReleaseJobCL31401()).on("R").end()
.from(combineReleaseJobNormalStepManager()).on("COMPLETED").to(combineReleaseJobAddressTableCheck())
.from(combineReleaseJobNormalStepManager()).on("FAILED").fail()
.end().build();
}
@Bean
public Step combineReleaseJobNormalStepManager() throws Exception {
return stepBuilderFactory.get("combineReleaseJobNormalStep.Manager")
.partitioner("combineReleaseJobNormalStep",partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() throws Exception {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("combineReleaseJobNormalStep");
partitionHandler.setGridSize(GRID_SIZE);
partitionHandler.setMessagingOperations(messageTemplate());
//partitionHandler.setPollInterval(5000l);
partitionHandler.setJobExplorer(this.jobExplorer);
partitionHandler.afterPropertiesSet();
return partitionHandler;
}
@Bean
public MessagingTemplate messageTemplate() {
MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());
messagingTemplate.setReceiveTimeout(60000000l);
return messagingTemplate;
}
/*
* Configure outbound flow (requests going to slaves)
*/
@Bean
public MessageChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
return IntegrationFlows
.from(outboundRequests())
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey(this.requestQueue))
.get();
}