Почему политика повторных попыток не работает для сплиттера весенней интеграции? - PullRequest
0 голосов
/ 27 сентября 2018

Политика повторных попыток не работает

<task:executor id="ticketSourceExc" pool-size="1"
        queue-capacity="0" rejection-policy="CALLER_RUNS" />
    <task:executor id="regulatoryExc" pool-size="1"
        queue-capacity="5" rejection-policy="CALLER_RUNS" />
   <int:service-activator input-channel="ticketCacheChannel"
        output-channel="sourceTicketsSplitter" ref="ticketSerActivator"
        method="fetchDataFromDB">
        <int:poller fixed-rate="10" task-executor="ticketSourceExc" />
    </int:service-activator>

    <!-- I am getting List<Tickets> -->
    <int:splitter input-channel="sourceTicketsSplitter"
        output-channel="sourceTicketChannel">
        <int:request-handler-advice-chain>
            <ref bean="retrier" />
        </int:request-handler-advice-chain>
        <int:poller fixed-rate="10" />
    </int:splitter>

    <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel" >
     <int:exponential-back-off initial="1000" multiplier="5" maximum="6000"/>
    </int:handler-retry-advice>

    <int:bridge id="regulatoryBrigde" input-channel="sourceTicketChannel"
        output-channel="regulatoryChannel">
        <int:poller fixed-rate="10" task-executor="regulatoryExc" />
    </int:bridge>

    <int:chain id="regulatoryChainFlow" input-channel="regulatoryChannel">
        <int:service-activator ref="regulatoryTaskActivator1"
            method="process" />
        <int:service-activator ref="regulatoryTaskActivator2"
            method="process" />
        <int:service-activator ref="regulatoryTaskActivator3"
            method="process" />
    </int:chain>

Я настроил политику повторных попыток для Splitter.Выходной канал - это канал очереди глубиной 1 (для целей тестирования).Я ожидаю, что когда я получу список из 10 элементов, он должен перейти на канал исключений после того, как очередь заполнится. Так не происходит. Я поместил спящий поток в нормативныйTaskActivator1, чтобы заблокировать очередь

1 Ответ

0 голосов
/ 28 сентября 2018

Это верно, потому что отправка на выходной канал не входит в совет.Это только для запроса части.Пожалуйста, внимательно прочитайте документы по этому вопросу: https://docs.spring.io/spring-integration/docs/5.0.8.RELEASE/reference/html/messaging-endpoints-chapter.html#message-handler-advice-chain. Рекомендация применяется только для метода handleRequestMessage().

ОБНОВЛЕНИЕ

Для случая использования, когда вы хотите обработать ограниченный размер очереди и выполнить некоторую обработку ошибок с повторной попыткой, я бы порекомендовал вывести разделениеприводит к service-activator вокруг @MessagingGateway с соответствующим error-channel и возможным @Retryable для @Gateway метода:

<splitter input-channel="sourceTicketsSplitter"
    output-channel="gatewayInputChannel">
       <poller fixed-rate="10" />
</splitter>

<service-activator input-channel="gatewayInputChannel" ref="gateway">
   <request-handler-advice-chain>
        <ref bean="retrier" />
    </request-handler-advice-chain>  
</service-activator>

<gateway id="gateway" default-request-channel="sourceTicketChannel"/>

Если ваш поток односторонний и вы не ожидаете никаких ответов от regulatoryChainFlow, тогда значение по умолчанию RequestReplyExchanger не соответствует вашим требованиям, и вам необходимо ввести простой интерфейс для шлюза с помощью метода void и настроить его <gateway>для этого как service-interface.

ОБНОВЛЕНИЕ

То, что я забыл упомянуть для вас, что необходимо использовать timeout для отправки ограниченномуочередь.Если такого тайм-аута нет, отправитель просто садится и ждет номер, который появляется там после упомянутых Thread.sleep(10000);.

Итак, вам нужно вот что:

<int:gateway id="stateGateWay" default-request-channel="stateChannel" 
    default-request-timeout="100"
    service-interface="com.biswo.myspringapp.gateway.StateGateWay" />

Обратите внимание на:

 <xsd:attribute name="default-request-timeout" type="xsd:string">
            <xsd:annotation>
                <xsd:documentation>
                        <![CDATA[
                Provides the amount of time dispatcher would wait to send a message.
                This timeout would only apply if there is a potential to block in the send call.
                For example if this gateway is hooked up to a Queue channel. 
                Value is specified in milliseconds; it can be a simple long value or a SpEL
                expression; array variable #args is available.
                        ]]>
                </xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>

После этого я начал видеть остатки в журналах, а затем Dispatcher has no subscribers:

2018-10-09 14:53:03.466 TRACE 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : RetryContext retrieved: [RetryContext: count=0, lastException=null, exhausted=false]
2018-10-09 14:53:03.466 DEBUG 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : Retry: count=0
2018-10-09 14:53:03.568 DEBUG 15808 --- [ask-scheduler-3] o.s.r.backoff.ExponentialBackOffPolicy   : Sleeping for 100
2018-10-09 14:53:03.669 DEBUG 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=1
2018-10-09 14:53:03.669 DEBUG 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : Retry: count=1
2018-10-09 14:53:03.771 DEBUG 15808 --- [ask-scheduler-3] o.s.r.backoff.ExponentialBackOffPolicy   : Sleeping for 500
2018-10-09 14:53:04.271 DEBUG 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=2
2018-10-09 14:53:04.271 DEBUG 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : Retry: count=2
2018-10-09 14:53:04.372 DEBUG 15808 --- [ask-scheduler-3] o.s.r.backoff.ExponentialBackOffPolicy   : Sleeping for 600
2018-10-09 14:53:04.973 DEBUG 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=3
2018-10-09 14:53:04.973 DEBUG 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : Retry: count=3
2018-10-09 14:53:05.073 DEBUG 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=4
2018-10-09 14:53:05.073 DEBUG 15808 --- [ask-scheduler-3] o.s.retry.support.RetryTemplate          : Retry failed last attempt: count=4
2018-10-09 14:53:05.077  WARN 15808 --- [ask-scheduler-3] o.s.i.c.MessagePublishingErrorHandler    : Error message was not delivered.

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.errorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.errorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessageDeliveryException: Failed to send message to channel 'stateChannel' within timeout: 100, failedMessage=GenericMessage [payload=State [id=2426, name=Pamanzi, countryId=141], headers={id=710d13df-dfbb-ba95-8d28-4785d1a1e3d3, timestamp=1539111184973}], failedMessage=GenericMessage [payload=State [id=2426, name=Pamanzi, countryId=141], headers={COUNTRY_ID=141, sequenceNumber=2, COUNTRY=com.biswo.myspringapp.model.Country@5fd4ea2d, sequenceSize=2, correlationId=ab423aa4-c80e-6af0-33a8-f57337f25352, id=a70d6b11-fc23-edd9-7973-1342c3fab8a6, timestamp=1539111183466}], headers={id=ee0ffd29-8e6a-ab5a-6ffb-1016fa38457e, timestamp=1539111185074}], failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessageDeliveryException: Failed to send message to channel 'stateChannel' within timeout: 100, failedMessage=GenericMessage [payload=State [id=2426, name=Pamanzi, countryId=141], headers={id=710d13df-dfbb-ba95-8d28-4785d1a1e3d3, timestamp=1539111184973}], failedMessage=GenericMessage [payload=State [id=2426, name=Pamanzi, countryId=141], headers={COUNTRY_ID=141, sequenceNumber=2, COUNTRY=com.biswo.myspringapp.model.Country@5fd4ea2d, sequenceSize=2, correlationId=ab423aa4-c80e-6af0-33a8-f57337f25352, id=a70d6b11-fc23-edd9-7973-1342c3fab8a6, timestamp=1539111183466}], headers={id=ee0ffd29-8e6a-ab5a-6ffb-1016fa38457e, timestamp=1539111185074}], headers={id=34d0e6e6-0af8-1078-5dd7-aa6643ba6c8d, timestamp=1539111185075}] for original GenericMessage [payload=[State [id=2425, name=Mayotte, countryId=141], State [id=2426, name=Pamanzi, countryId=141]], headers={COUNTRY_ID=141, id=ab423aa4-c80e-6af0-33a8-f57337f25352, COUNTRY=com.biswo.myspringapp.model.Country@5fd4ea2d, timestamp=1539111183462}]

Только потому, что ваш errorChannel являетсяDirectChannel и для него нет потребителей.

...