Состояние выполнения шага подчиненного пакета Spring не обновляется в jobExecution.getStepExecutions () - PullRequest
0 голосов
/ 19 марта 2019

У меня весенний пакетный проект с настройкой задания удаленного раздела. Задание может быть завершено, но состояние задания не выполнено в объекте "BATCH_JOB_EXECUTION". Но все подчиненные завершают работу с завершенным состоянием в объекте "BATCH_STEP_EXECUTION".Я настроил (stepexecution & Jobexecutiion) прослушиватель для прослушивания статуса главного и подчиненного.Каким-то образом ведомый не обновляет свой статус до главного.

public void afterJob(JobExecution jobExecution) {
    int stepStatus = 0;
    for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
    log.debug("stepexecution:"+Collections.SingletonList(jobExecution.getStepExecutions()));
        boolean isCompleted = stepExecution.getExitStatus().getExitCode()
                .equals(ExitStatus.COMPLETED.getExitCode());
        if (!isCompleted)
            stepStatus += 1;
    }
    if (stepStatus == 0) {
        log.debug("The batch status listener" + stepStatus);
        jobExecution.setStatus(BatchStatus.COMPLETED);
    } else {
        log.debug("The batch status listener" + stepStatus);
        jobExecution.setStatus(BatchStatus.FAILED);
    }

}

Журналы: в После задания ()

[[
   StepExecution: id=5116475, version=3, name=createTable, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=,
   StepExecution: id=5116476, version=2, name=masterStep, status=COMPLETED, exitStatus=COMPLETED, readCount=22, filterCount=0, writeCount=22 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=5, rollbackCount=0, exitDescription=, 
   StepExecution: id=5116477, version=0, name=slave:partition3, status=STARTING, exitStatus=EXECUTING, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=,
   StepExecution: id=5116478, version=0, name=slave:partition2, status=STARTING, exitStatus=EXECUTING, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=,
   StepExecution: id=5116480, version=0, name=slave:partition5, status=STARTING, exitStatus=EXECUTING, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=, 
   StepExecution: id=5116479, version=0, name=slave:partition4, status=STARTING, exitStatus=EXECUTING, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=,
   StepExecution: id=5116481, version=0, name=slave:partition1, status=STARTING, exitStatus=EXECUTING, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=, 
   StepExecution: id=5116484, version=3, name=deleteTable, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=
]]

Конфигурация:

    <step id="slave">
        <tasklet>
            <chunk reader="datareader" processor="dataprocessor" writer="datawriter"
                commit-interval="90" />
        </tasklet>
        <listeners>
            <listener ref="customStepListener" />
        </listeners>
    </step>

<beans:bean id="partitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler" init-method="afterPropertiesSet" scope="step">
        <beans:property name="messagingOperations" ref="messagingTemplate"></beans:property>
        <beans:property name="stepName" value="slave" />
        <beans:property name="gridSize" value="5" />
        <beans:property name="pollInterval" value="5000"></beans:property>
        <beans:property name="jobExplorer" ref="jobExplorer"></beans:property>
        <beans:property name="replyChannel" ref="outboundReplies"></beans:property>
    </beans:bean>

    <beans:bean id="PeriodicTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
        <beans:constructor-arg value="1"></beans:constructor-arg>
    </beans:bean> 

            <!--Simple container collecting information to describe a queue. Used in conjunction with AmqpAdmin  -->
<beans:bean id="requestQueue" class="org.springframework.amqp.core.Queue">
    <beans:constructor-arg name="name" value="ifood">
    </beans:constructor-arg>
    <beans:constructor-arg name="durable" value="false">
    </beans:constructor-arg> 
</beans:bean>

<int:poller id="PollerMetadata"  default ="true" trigger="PeriodicTrigger" task-executor="taskExecutor"></int:poller>  

<beans:bean id="amqptemplate" 
    class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <beans:property name="connectionFactory" ref="rabbitConnFactory" />
    <beans:property name="routingKey" value="ifood"/>
    <beans:property name="queue" value="ifood"/>
</beans:bean>

<beans:bean id="amqpOutboundEndpoint3" class="org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint">
    <beans:constructor-arg ref="amqptemplate"/>
    <beans:property name="expectReply" value="false"></beans:property>
    <beans:property name="routingKey" value="ifood"></beans:property>
    <beans:property name="outputChannel" ref="inboundRequests"></beans:property>
</beans:bean>


<int:service-activator ref="amqpOutboundEndpoint3" input-channel="outboundRequests" />

<beans:bean id="SimpleMessageListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <beans:constructor-arg ref="rabbitConnFactory"/>
    <beans:property name="queueNames" value="ifood"></beans:property>
    <beans:property name="autoStartup" value="false"></beans:property>


</beans:bean>

    <beans:beans profile="master">
    <job id="Delta.ifoodItem.Partition" restartable="true" incrementer="paramIncrementer" >

        <step id="createTable">
            <tasklet ref="createDeltaTableTasklet" />
            <next on="*" to="masterStep" />
            <next on="FAILED" to="deleteTable" />
        </step>
        <!-- master step, 5 threads (grid-size) -->
        <step id="masterStep">
            <partition step="slave" partitioner="rowNoPartitioner" handler="partitionHandler">
            <!--      <handler   "partitionHandler" grid-size="5" task-executor="taskExecutor" />    -->
            </partition>
            <end on="ABANDONED" exit-code="EARLY TERMINATION" />
            <fail on="STOPPED" exit-code="EARLY TERMINATION" />
            <next on="*" to="deleteTable" />
            <next on="FAILED" to="deleteTable" />
        </step>
        <step id="deleteTable">
            <tasklet ref="deleteTableTasklet" />
        </step>
        <listeners>
            <listener ref="customStepListener" />
        </listeners>

    </job>
</beans:beans>

    <beans:beans profile="slave">  
        <beans:bean id="AmqpInboundChannelAdapter" class="org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter"  init-method="afterPropertiesSet">
            <beans:constructor-arg ref="SimpleMessageListenerContainer"/>
            <beans:property name="outputChannel" ref="inboundRequests"></beans:property>

        </beans:bean>

        <beans:bean id="StepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
            <beans:property name="jobExplorer" ref="jobExplorer"/>
            <beans:property name="stepLocator" ref="stepLocator"/>
        </beans:bean>

        <int:service-activator ref="StepExecutionRequestHandler" input-channel="inboundRequests" output-channel="outboundStaging"/>

    </beans:beans> 

ВыполнениеРезультат

...