Интеграция Spring Batch. Как сделать удаленные шаги, внешние по отношению к заданию, видимыми для StepLocator в JVM удаленного выполнения? - PullRequest
0 голосов
/ 23 апреля 2019

Мои задания настроены для удаленного разделения, я отправляю сообщение через kafka, подчиненная виртуальная машина получает запросы и пытается запустить шаг, но не может найти шаг.

Похоже, что beanFactory (applicationContext), который я предоставляю, не содержит шаг удаленного выполнения.

Я несколько раз просматривал пример на https://www.youtube.com/watch?v=CYTj5YT7CZU, и мне не хватает правильногоbeanFactory идентифицируется и устанавливается для подчиненных заданий.

Во время инициализации приложения ClasspathXmlApplicationContextsFactoryBean загружается из библиотеки заданий, которая содержит XML для всех весенних пакетных заданий и шагов удаленного ведомого.Этот компонент содержит отдельный ресурс для каждого XML-файла, включая подчиненные файлы (как и ожидалось), и назначен для applicationContext.

@Bean
public ClasspathXmlApplicationContextsFactoryBean classpathXmlApplicationContextsFactoryBean () throws IOException
{
    String resourcePath = configPropertiesService
            .fetchPropertyValue(PropertyValueConstants.MXARCHIVE_SKELETON_LOCATION,
                    PropertyValueConstants.MXARCHIVE_SKELETON_LOCATION_DEFAULT)
            .getValue() + "*.xml";

    logger.trace("classpathXmlApplicationContextsFactoryBean()  :: {} ", resourcePath);
    Resource[] resources = applicationContext.getResources(resourcePath);

    ClasspathXmlApplicationContextsFactoryBean bean = new ClasspathXmlApplicationContextsFactoryBean ();
    bean.setApplicationContext(applicationContext);
    bean.setResources(resources);

    return bean;
}

BeanFactoryStepLocator инициализируется с помощью applicationContext.

@Bean
public BeanFactoryStepLocator stepLocator() throws Exception
{
    BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator ();

/* applicationContext is autowired in the postconstruct - what we do here doesn't matter
    stepLocator.setBeanFactory(applicationContext); // TODO: Find the right factory
    */
    return stepLocator;
};

// EDIT: added method to code listing so the calling method is visible
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() throws Exception {
    StepExecutionRequestHandler result = new StepExecutionRequestHandler();
    ;

    result.setJobExplorer(jobExplorer);
    BeanFactoryStepLocator stepLocator = stepLocator ();
    result.setStepLocator(stepLocator);
    return result;
}

При выполнении каркас пытается вызвать шаг и завершается неудачно, потому что шаг не представлен в контексте приложения (beanFactory).

Thread-7 2019-04-23 12:47:30,343 INFO  c.m.m.s.DefaultBatchConfigurer - ThreadPoolTaskExecutor status => Active threads :: 1, Total threads ::30, 3.3333333333333335 % Active.
executionContainer-C-1 2019-04-23 12:47:30,966 ERROR o.s.k.l.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = job.step, partition = 0, offset = 47, CreateTime = 1556041503162, serialized key size = 4, serialized value size = 78, headers = RecordHeaders(headers = [RecordHeader(key = sequenceNumber, value = [49, 49]), RecordHeader(key = correlationId, value = [34, 49, 55, 57, 49, 58, 102, 115, 45, 115, 116, 101, 112, 48, 48, 48, 50, 46, 115, 108, 97, 118, 101, 34]), RecordHeader(key = sequenceSize, value = [49, 50]), RecordHeader(key = spring_json_header_types, value = [123, 34, 115, 101, 113, 117, 101, 110, 99, 101, 78, 117, 109, 98, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 83, 105, 122, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 114, 114, 101, 108, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = 11, value = StepExecutionRequest: [jobExecutionId=1791, stepExecutionId=21396, stepName=fs-step0002.slave])
org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'fs-step0002.slave' available
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:109) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:461) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369) ~[spring-integration-kafka-3.1.1.RELEASE.jar:3.1.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74) ~[spring-integration-kafka-3.1.1.RELEASE.jar:3.1.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431) ~[spring-integration-kafka-3.1.1.RELEASE.jar:3.1.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402) ~[spring-integration-kafka-3.1.1.RELEASE.jar:3.1.1.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1159) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1099) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'fs-step0002.slave' available
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanDefinition(DefaultListableBeanFactory.java:772) ~[spring-beans-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getMergedLocalBeanDefinition(AbstractBeanFactory.java:1221) ~[spring-beans-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:294) ~[spring-beans-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204) ~[spring-beans-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.batch.integration.partition.BeanFactoryStepLocator.getStep(BeanFactoryStepLocator.java:34) ~[spring-batch-integration-4.1.1.RELEASE.jar:4.1.1.RELEASE]
    at org.springframework.batch.integration.partition.StepExecutionRequestHandler.handle(StepExecutionRequestHandler.java:58) ~[spring-batch-integration-4.1.1.RELEASE.jar:4.1.1.RELEASE]
    at sun.reflect.GeneratedMethodAccessor687.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_162]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_162]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1119) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:617) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:490) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:313) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:106) ~[spring-integration-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    ... 29 common frames omitted

XML шага (очищенный):

<?xml version="1.0" encoding="UTF-8"?>
<beans default-lazy-init="false" > <!-- usual suspects -->

    <bean class="org.springframework.batch.core.scope.JobScope">
        <property name="proxyTargetClass" value="true" />
    </bean>

    <bean class="org.springframework.batch.core.scope.StepScope">
        <property name="proxyTargetClass" value="true" />
    </bean>

    <batch:step id="fs-step0002.slave">
        <batch:tasklet transaction-manager="jtaTransactionManager"
            start-limit="100">
            <batch:chunk reader="myCustomItemReader" writer="myCustomWriter"
                commit-interval="1"/>

        </batch:tasklet>
        <batch:fail on="FAILED" />
    </batch:step>
</beans>

РЕДАКТИРОВАТЬ 1:

Я принудительно создал новый ClasspathXmlApplicationContextsFactoryBean, который читает только файл удаленного шага (и только один из них).Он по-прежнему жалуется на то, что нет бина с ожидаемым именем, но я могу видеть, что он определенно там.

Позже, когда я прослеживаю в steplocator, я вижу, что beanFactory пуст.

Обходной код для тестирования:

@Bean
public ClasspathXmlApplicationContextsFactoryBean stepFactoryBean() throws IOException {
    String resourcePath = configPropertiesService
            .fetchPropertyValue(PropertyValueConstants.MXARCHIVE_SKELETON_LOCATION,
                    PropertyValueConstants.MXARCHIVE_SKELETON_LOCATION_DEFAULT)
            .getValue() + "*slave.xml";

    logger.trace("classpathXmlApplicationContextsFactoryBean()  :: {} ", resourcePath);
    Resource[] resources = applicationContext.getResources(resourcePath);

    ClasspathXmlApplicationContextsFactoryBean bean = new ClasspathXmlApplicationContextsFactoryBean();
    bean.setApplicationContext(applicationContext);
    bean.setResources(resources);
    return bean;
}

@Bean
public BeanFactoryStepLocator stepLocator() throws Exception {

    BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
    ApplicationContextFactory[] factories = stepFactoryBean().getObject();
    if (factories != null) {         
        stepLocator.setBeanFactory(factories[0].createApplicationContext().getBeanFactory());
    }

    return stepLocator;
};

Представление отладчика beanFactory в stepLocator во время создания.

Debugger view of beanFactory within the stepLocator

Отладчик Просмотр результатов getStep () в конце метода фабрики stepLocator ():

Debugger View of results of getStep() at end of stepLocator() factory method

РЕДАКТИРОВАТЬ 2:

Похоже, что оригинальный "applicationContext" автоматически подключается как beanfactory, поэтому реальная проблема заключается в загрузке дополнительных bean-компонентов в applicationContext.

Я также попробовал решения, предложенные на , какзагрузить дополнительный файл конфигурации bean весной во время выполнения и связанные вопросы.

1 Ответ

0 голосов
/ 25 апреля 2019

Вы можете попытаться объявить необходимую зависимость для BeanFactory и позволить Spring дать правильный экземпляр вместо того, чтобы вы выполняли поиск / внедрение вручную. Так что вместо:

@Bean
public BeanFactoryStepLocator stepLocator() throws Exception {
   BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
   ApplicationContextFactory[] factories = stepFactoryBean().getObject();
   if (factories != null) {         
      stepLocator.setBeanFactory(factories[0].createApplicationContext().getBeanFactory());
   }
   return stepLocator;
};

используйте что-то вроде:

@Bean
public StepLocator stepLocator(BeanFactory beanFactory) {
    BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
    beanFactoryStepLocator.setBeanFactory(beanFactory);
    return beanFactoryStepLocator;
}

Надеюсь, это поможет.

...