У меня ниже входящий компонент кварц, чтобы вызвать событие kafka. Но, похоже, SEDA выбрасывает в очередь исключение.
<quartz:connector name="myQuartzConnector" validateConnections="true">
<receiver-threading-profile maxThreadsActive="1"/>
</quartz:connector>
<flow name="quartz-scheduler-kafka-consumer-trigger-flow">
<quartz:inbound-endpoint jobName="Trigger-Kafka-Consumer-Quartz-Job" repeatInterval="1" responseTimeout="10000" connector-ref="myQuartzConnector" doc:name="Quartz">
<quartz:event-generator-job/>
</quartz:inbound-endpoint>
<component class="org.my.myKafkaCOnsumer" doc:name="Java KafkaConsumer"/>
</flow>
Кварц используется для запуска потока потребителей Kafka. Элемент управления не возвращается обратно в планировщик до тех пор, пока подключение клиента Kafka не завершится в компоненте java. Потребительская связь Kafka никогда не закончится, поскольку она является рекурсивной, пока (true) l oop. Случайно, если соединения Kafka заканчиваются, кварцевый планировщик должен повторно запустить компонент java, который открывает соединение kafka.
Message : The queue for 'SEDA Stage quartz-scheduler-kafka-consumer-trigger-flow.stage1' did not accept new event within 30000 MILLISECONDS.
Payload : {NullPayload}
Payload Type : org.mule.transport.NullPayload
Element : null @ message-gateway-profile-update-api:null:null
--------------------------------------------------------------------------------
Root Exception stack trace:
org.mule.api.service.FailedToQueueEventException: The queue for 'SEDA Stage quartz-scheduler-kafka-consumer-trigger-flow.stage1' did not accept new event within 30000 MILLISECONDS.
at org.mule.processor.SedaStageInterceptingMessageProcessor.enqueue(SedaStageInterceptingMessageProcessor.java:139)
at org.mule.processor.SedaStageInterceptingMessageProcessor.processNextAsync(SedaStageInterceptingMessageProcessor.java:102)
at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:103)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
Снимок потока