Как запустить подчиненные приложения Spring Batch, которые используют разбиение? - PullRequest
0 голосов
/ 25 февраля 2020

Мы используем разделение Spring Batch для параллельной обработки нескольких входных файлов на двух JVM. В одной JVM работает один ведущий и один ведомый, а в другой JVM работает другой ведомый.

Запуск главного и подчиненного в первой JVM выполняется путем запуска загрузочного приложения Spring с передачей имени задания, как и при запуске любого другого пакетного задания.

Мы запускаем ведомое устройство на второй JVM, запуская загрузочное приложение Spring с именем dummyjob. У ведомого устройства нет конфигурации задания, у него есть только входящий поток для получения сообщения, stepExecutionRequestHandler и кода шага.

Результат: Все подчиненные бины успешно инициализированы подчиненный потребитель получает сообщение и запускает stepExecutionRequestHandler, который не может создать соединение с БД без какой-либо ошибки. Если я добавлю конфигурацию задания в ведомое устройство и начну задание, передавая правильное имя задания, проблема не возникнет, что заставляет меня думать, что проблема может быть связана с незапуском реального задания Spring Batch, которое должно инициализировать некоторые необходимые Ресурсы. Я проверил, что компоненты datasourceConfiguration и datasource были инициализированы, что делается как часть отдельного модуля.

Поэтому мне интересно, правильно ли я запускаю рабов или есть лучший способ их запустить.

Вот конфигурация раба:


  /*
   * Configure inbound flow (requests coming from the master)
   */

  @Bean
  public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
  }

  @Bean
  public StepLocator stepLocator() {
    BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
    beanFactoryStepLocator.setBeanFactory(beanFactory);

    return beanFactoryStepLocator;
  }

 @Bean
  @ServiceActivator(inputChannel = "inboundRequests")
  public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
  }

  @Bean
  public MessageChannel inboundRequests() {
    return new DirectChannel();
  }

  @Bean
  public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
                                           @Qualifier("inboundRequests") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
  }

  @Bean
  public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames(this.requestQueue);
    container.setPrefetchCount(1);

    return container;
  }

combineReleaseJobNormalStep CODE ... 

Вот конфигурация мастера:


  @Bean
  public Job combineReleaseJob() throws Exception {
    return jobBuilderFactory.get("CombineReleaseJob")
      .incrementer(new RunIdIncrementer())
      .listener(resourceLoader)
      .listener(combineReleaseJobJobContextPreparer())
      .flow(combineReleaseJobCL31401())
      .from(combineReleaseJobCL31401()).on("N").to(combineReleaseJobNormalStepManager())
      .from(combineReleaseJobCL31401()).on("R").end()
      .from(combineReleaseJobNormalStepManager()).on("COMPLETED").to(combineReleaseJobAddressTableCheck())
      .from(combineReleaseJobNormalStepManager()).on("FAILED").fail()
      .end().build();
  }

  @Bean
  public Step combineReleaseJobNormalStepManager() throws Exception {
    return stepBuilderFactory.get("combineReleaseJobNormalStep.Manager")
      .partitioner("combineReleaseJobNormalStep",partitioner())
      .partitionHandler(partitionHandler())
      .build();
  }


  @Bean
  public PartitionHandler partitionHandler() throws Exception {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();

    partitionHandler.setStepName("combineReleaseJobNormalStep");
    partitionHandler.setGridSize(GRID_SIZE);
    partitionHandler.setMessagingOperations(messageTemplate());
    //partitionHandler.setPollInterval(5000l);
    partitionHandler.setJobExplorer(this.jobExplorer);

    partitionHandler.afterPropertiesSet();

    return partitionHandler;
  }

  @Bean
  public MessagingTemplate messageTemplate() {
    MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());

    messagingTemplate.setReceiveTimeout(60000000l);

    return messagingTemplate;
  }

  /*
   * Configure outbound flow (requests going to slaves)
   */
  @Bean
  public MessageChannel outboundRequests() {
    return new DirectChannel();
  }

  @Bean
  public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
    return IntegrationFlows
      .from(outboundRequests())
      .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(this.requestQueue))
      .get();
  }

Ответы [ 2 ]

1 голос
/ 25 февраля 2020

Если я добавлю конфигурацию задания в ведомое устройство и начну задание, передавая правильное имя задания, проблема не произойдет, что заставляет меня думать, что проблема может быть связана с незапуском реального задания Spring Batch,

Вам не нужно запускать целое задание Spring Batch на рабочей стороне. Задание обычно запускается на стороне мастера, и на стороне работника требуются только рабочие шаги. Пожалуйста, обратитесь к разделу Удаленное разбиение справочной документации.

Как запустить подчиненные приложения Spring Batch, использующего разбиение?

Рабочие могут быть запущены как обычное приложение Spring (загрузочное), где StepExecutionRequestHandler (обычно настроенный как активатор службы Spring Integration) прослушивает входящие StepExecutionRequest s и выполняет рабочий шаг (расположенный с StepLocator).

Вы можете найти полный набор примеров в докладе Высокопроизводительная пакетная обработка , который я совместно представил Майклу на SpringOne 2018. Исходный код примеров можно найти здесь: https://github.com/mminella/scaling-demos/tree/sp1-2018

0 голосов
/ 27 февраля 2020

Мы обнаружили, что запускаем ведомое устройство как пакетное задание, поскольку оно было первоначально добавлено в то же пакетное приложение, что и ведущее. Имя задания было обязательным параметром в пакетном приложении, и оно имело много зависимостей от пакетных инструментов и jar-файлов Spring, поэтому каким-то образом оно пыталось запустить пакетное задание, и поскольку конфигурация задания не существует, в середине произойдет сбой. обработки и выключить все компоненты и ресурсы, вызывая проблему с подключением к БД, описанную выше. Когда мы запускаем ведомое устройство как приложение deamon или обычное приложение с весенней загрузкой, оно запускается нормально и выполняет шаг до завершения.

Чтобы избежать больших переделок и удалить все пакетные зависимости из приложения, чтобы оно работало как обычное загрузочное приложение Spring, мы использовали этот код, чтобы он работал как демон:

@SpringBootApplication
@IntegrationComponentScan
public class Application implements CommandLineRunner {

  public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
  }

  @Override
  public void run(String... args) throws Exception {
    System.out.println("Joining thread, you can press Ctrl+C to shutdown application");
    Thread.currentThread().join();
  }

}

``

...