Удаленный раздел - раб становится жадным - PullRequest
0 голосов
/ 13 июня 2018

Следующее - это то, чего мы пытаемся достичь.

Мы хотим, чтобы большой XML-файл размещался в базе данных параллельно в разных vms.Чтобы добиться этого, мы используем подход с масштабируемым удаленным разделением на основе пружинных пакетов и сталкиваемся с некоторыми проблемами.Ниже приведена высокоуровневая настройка

  • master - разбивает XML-файл на несколько разделов (в настоящее время размер сетки равен 3).
  • slave 1 - обработка разделов (читает разделы на основе индекса и запись в БД)
  • slave 2 - обработка разделов

    Мы работаем в Linux и с активнымиMQ 5.15.3.

При вышеуказанной настройке

  • подчиненное устройство 1 обрабатывает 2 раздела одновременно

  • slave 2 обрабатывает 1 раздел.

Ведущий не ожидает завершения работы всех ведомых устройств и переходит в неизвестное состояние.

org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)

если у нас размер сетки равен 2, то раб 1 выбирает их обоих, а раб 2 не получает ни одного из них (следовательно, жадный раб).Ведомый 1 обрабатывает их параллельно, но задание переходит в неизвестное состояние.

Ниже приведены наши вопросы

  • Как мы можем запретить ведомому устройству 1 параллельно обрабатывать два раздела - мы попытались установить предварительную выборку равной 0 и 1 [ссылка] http://activemq.apache.org/what-is-the-prefetch-limit-for.html [ссылка], но это не сработало.Как заставить рабов обрабатывать один раздел за раз?
  • Почему хозяин не ждет всех рабов?

Ниже приведена наша конфигурация

Основная конфигурация

 <?xml version="1.0" encoding="UTF-8"?>
    <step id="remotePartitionStagingStep">
     <partition partitioner="xmlPartitioner" handler="partitionStagingHandler"/>
       <listeners>
         <listener ref="jobListener" />
       </listeners>
     </step>
     <!-- XML Partitioner starts here -->
     <beans:bean id="xmlPartitioner" class="XMLPartitioner" scope="step">           <beans:property name="partitionThreadName" value="ImportXMLPartition-"/>
      <beans:property name="resource" value="file:///#{jobParameters[ImportFileProcessed]}"/>
            <beans:property name="rootElementNode" value="#{jobParameters[ImportFragmentRootNameWithPrefix]}"/>
        </beans:bean>
       <beans:bean id="partitionStagingHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
            <beans:property name="stepName" value="slave.ImportStagingStep"/>
            <beans:property name="gridSize" value="3"/>
            <beans:property name="replyChannel" ref="aggregatedReplyChannel"/>
            <beans:property name="jobExplorer" ref="jobExplorer"/>
            <beans:property name="messagingOperations">
                <beans:bean class="org.springframework.integration.core.MessagingTemplate">
                    <beans:property name="defaultChannel" ref="requestsChannel"/>
                    <beans:property name="receiveTimeout" value="${batch.gateway.receiveTimeout}" />  //360000 is the current value
                </beans:bean>
            </beans:property>
        </beans:bean>
         <int:aggregator ref="partitionStagingHandler"
                        input-channel="replyChannel"
                        output-channel="aggregatedReplyChannel"
                        send-timeout="${batch.gateway.receiveTimeout}"
                        expire-groups-upon-timeout="true"/>
         <int:channel id="requestsChannel">
            <int:interceptors>
                <int:wire-tap channel="logChannel"/>
            </int:interceptors>
        </int:channel>
         <int-jms:outbound-channel-adapter connection-factory="connectionFactory"
                                          channel="requestsChannel"
                                          destination-name="requestsQueue"/>
        <int:channel id="aggregatedReplyChannel">
            <int:queue/>
        </int:channel>
         <int:channel id="replyChannel">
            <int:interceptors>
                <int:wire-tap channel="logChannel"/>
            </int:interceptors>
        </int:channel>
         <int-jms:message-driven-channel-adapter connection-factory="connectionFactory"
                                                channel="replyChannel"
                                                error-channel="errorChannel"
                                                destination-name="replyQueue"/>

Конфигурация ведомого

    <step id="slave.ImportStagingStep">
    <tasklet transaction-manager="transactionManager">
    <chunk reader="StagingSpecificItemReader" processor="chainedStagingProcessor" writer="StagingItemWriter"
             commit-interval="${import.CommitInterval}" skip-limit="${import.skipLimit}" retry-policy="neverRetryPolicy">
     <streams>
      <stream ref="errorFlatFileRecordWriter"/>
     </streams>
     <skippable-exception-classes>
       <include class="java.lang.Exception"/>
       <exclude class="org.springframework.oxm.UnmarshallingFailureException"/>
       <exclude class="java.lang.Error"/>
     </skippable-exception-classes>
     </chunk>
     <listeners>
        <listener ref="stepExceptionListener"/>
        <listener ref="stagingListener"/>
     </listeners>
       </tasklet>
       <listeners>
          <listener ref="slaveStepExecutionListener"/>
       </listeners>
      </step>
        <beans:bean id="slaveStepExecutionListener" class="StepExecutionListener"></beans:bean>
        <!-- JMS config for staging step starts here -->
        <int:channel id="replyChannel">
            <int:interceptors>
                <int:wire-tap channel="logChannel"/>
            </int:interceptors>
        </int:channel>
            <int:channel id="requestChannel">
            <int:interceptors>
                <int:wire-tap channel="logChannel"/>
            </int:interceptors>
        </int:channel>
        <int-jms:message-driven-channel-adapter connection-factory="connectionFactory"
                                                destination-name="requestsQueue"
                                                error-channel="errorChannel"
                                                channel="requestsChannel"/>

        <int-jms:outbound-channel-adapter connection-factory="connectionFactory"
                                          destination-name="replyQueue"
                                          channel="replyChannel"/>

        <int:service-activator input-channel="requestsChannel"
                               output-channel="replyChannel"
                               ref="stepExecutionRequestHandler"/>

        <!-- JMS config for staging step ends here -->

        <!-- The logChannel is configured as an interceptor to channels so that messages are logged. -->
        <int:logging-channel-adapter auto-startup="true" log-full-message="true" id="logChannel" level="INFO"/>

        <int:channel id="errorChannel" />

        <int:service-activator input-channel="errorChannel" method="handleException">
          <beans:bean class="ErrorHandler" />
        </int:service-activator>

1 Ответ

0 голосов
/ 15 июня 2018

Нашли проблему, которая у нас была.

Подчиненное устройство дважды включало определение задания пакетной пружины, в результате чего число потребителей было зарегистрировано дважды для каждого подчиненного устройства.Я думаю, что нам не хватает некоторой конфигурации для маршрутизации сообщений обратно от ведомого устройства к ведущему, когда у нас есть два раздела, работающие одновременно на ведомом устройстве правильно, так что в этот момент, пока у нас есть один раздел, идущий к одному ведомому устройству (мы многопоточны нараб) у нас все хорошо.

...