Mule FailedToQueueEventException Кварцевое соединение - PullRequest
0 голосов
/ 11 апреля 2020

У меня ниже входящий компонент кварц, чтобы вызвать событие kafka. Но, похоже, SEDA выбрасывает в очередь исключение.

<quartz:connector name="myQuartzConnector" validateConnections="true">
    <receiver-threading-profile maxThreadsActive="1"/>
</quartz:connector>

<flow name="quartz-scheduler-kafka-consumer-trigger-flow">
    <quartz:inbound-endpoint jobName="Trigger-Kafka-Consumer-Quartz-Job" repeatInterval="1" responseTimeout="10000" connector-ref="myQuartzConnector" doc:name="Quartz">
        <quartz:event-generator-job/>
    </quartz:inbound-endpoint>
    <component class="org.my.myKafkaCOnsumer" doc:name="Java KafkaConsumer"/>
</flow>

Кварц используется для запуска потока потребителей Kafka. Элемент управления не возвращается обратно в планировщик до тех пор, пока подключение клиента Kafka не завершится в компоненте java. Потребительская связь Kafka никогда не закончится, поскольку она является рекурсивной, пока (true) l oop. Случайно, если соединения Kafka заканчиваются, кварцевый планировщик должен повторно запустить компонент java, который открывает соединение kafka.

Message               : The queue for 'SEDA Stage quartz-scheduler-kafka-consumer-trigger-flow.stage1' did not accept new event within 30000 MILLISECONDS.
Payload               : {NullPayload}
Payload Type          : org.mule.transport.NullPayload
Element               : null @ message-gateway-profile-update-api:null:null
--------------------------------------------------------------------------------
Root Exception stack trace:
org.mule.api.service.FailedToQueueEventException: The queue for 'SEDA Stage quartz-scheduler-kafka-consumer-trigger-flow.stage1' did not accept new event within 30000 MILLISECONDS.
    at org.mule.processor.SedaStageInterceptingMessageProcessor.enqueue(SedaStageInterceptingMessageProcessor.java:139)
    at org.mule.processor.SedaStageInterceptingMessageProcessor.processNextAsync(SedaStageInterceptingMessageProcessor.java:102)
    at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:103)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)

Снимок потока

Ответы [ 2 ]

0 голосов
/ 11 апреля 2020

Ошибка вызвана тем, что конечная точка Quartz настроена на запуск быстрее, чем поток может обработать сообщения. Этот поток имеет стратегию обработки по умолчанию асинхронный в очереди, что означает, что события, инициируемые конечной точкой Quartz, отправляются в очередь SEDA , а затем обрабатываются потоками потока как доступные. Конечная точка кварца настроена на повторение каждые 1 мс, что очень мало. В это время вероятность обработки компонента очень мала. Когда пул потоков, используемый потоком, исчерпан, тогда очередь SEDA начинает заполняться. Когда элементы в очереди превышают тайм-аут по умолчанию, чтобы получить выполнение потока, вы получаете сообщение об ошибке. Эта проблема описана в КБ https://help.mulesoft.com/s/article/Error-The-queue-for-SEDA-queue-name-did-not-accept-new-event-within-30000-MILLISECONDS

Можно изменить стратегию обработки потока на синхронную, чтобы повторно использовать очередь из соединителя для выполнения и избежать очереди, но, как представляется, repeatInterval быть нереально малым.

На заметку о том, что Кварцевый разъем долгое время считался устаревшим go. Он был заменен областью Опрос .

0 голосов
/ 11 апреля 2020

Элемент управления не возвращается обратно в расписание

Это неверное утверждение. Планировщик понятия не имеет, как заканчивается предыдущий поток. Фактически старый поток продолжает выполняться, и запускается событие для нового процесса. Вы можете увидеть это, если вы поместите регистратор в начало потока в качестве первого компонента после планировщика.

На самом деле, поскольку вы слушаете сообщения, он не будет планировщиком. Слушатель должен быть источником потока. Это должно выглядеть так:

<flow name="quartz-scheduler-kafka-consumer-trigger-flow">
    <component class="org.my.myKafkaCOnsumer" doc:name="Java KafkaConsumer"/>
</flow>

enter image description here

Вот еще немного о нескольких расписаниях https://simpleflatservice.com/mule4/Multipleschedules.html

...