У меня есть два маршрута Camel, настроенных в XML и вставленных ниже: -
Маршрут 1:
<camel:route id="statementsArchivingPollRoute">
<camel:from uri="timer://tempQueue?fixedRate=true&period=30s&delay=30s"/>
<camel:transacted ref="PROPAGATION_REQUIRED">
<camel:process ref="statementsArchivingRequestZipProcessor"/>
<camel:choice>
<camel:when>
<camel:simple>${body.size} >= 1</camel:simple>
<camel:split>
<camel:simple>${body}</camel:simple>
<camel:marshal ref="archiveFileInterfaceMetadataMapper"/>
<camel:to pattern="InOnly"
uri="activemq:{{ccs.activemq.queue.prefix}}.sr.archive.bulk.ingestion.req?jmsMessageType=Text"/>
</camel:split>
<camel:log loggingLevel="INFO" message="Archiving content was processed"/>
</camel:when>
<camel:otherwise>
<camel:log loggingLevel="INFO" message="No archiving content to process"/>
</camel:otherwise>
</camel:choice>
</camel:transacted>
</camel:route>
Маршрут 2:
<camel:route id="statementsArchivingBulkIngestionRequestRoute">
<camel:from uri="activemq:{{ccs.activemq.queue.prefix}}.sr.archive.bulk.ingestion.req"/>
<camel:throttle timePeriodMillis="4000">
<camel:constant>1</camel:constant>
<camel:setExchangePattern pattern="InOnly"/>
<camel:unmarshal ref="archiveFileInterfaceMetadataMapper"/>
<camel:bean ref="archiveFileEntryTransformer" method="transform"/>
<camel:setHeader headerName="CamelHttpMethod">
<camel:constant>POST</camel:constant>
</camel:setHeader>
<camel:toD uri="{{ccs.bulk.ingestion.service.ingest.archive.file}}"/>
</camel:throttle>
</camel:route>
Процессор по первому маршруту возвращает список объектов запроса. Затем список разделяется, и каждый запрос сортируется и помещается в очередь.
Второй маршрут прослушивает эту очередь. Когда он удаляет сообщение из очереди, он демарширует его, выполняет преобразование, а затем использует его для отправки запроса на публикацию в другую службу. Я ограничиваю этот маршрут, чтобы он обрабатывал только одно сообщение в секунду, чтобы не перегружать нисходящую службу.
Все это прекрасно работает, когда список содержит только несколько запросов и, следовательно, только несколько сообщений попадают в очередь. , но если в списке много элементов, маршрут 2 раза истекает, и появляется следующая запись в журнале:
Atomikos:12] c.a.icatch.imp.ActiveStateHandler : Timeout/setRollbackOnly of ACTIVE coordinator !
Тайм-аут приводит к повторению процесса, и нисходящая служба заканчивается вызовом несколько раз за сообщение, а не один раз.
Я не могу понять, почему количество вызовов маршрута 2 должно вызывать тайм-аут. Я думал, что один экземпляр маршрута будет запущен для каждого сообщения, находящегося в очереди из activemq. Если отдельное сообщение занимает много времени, то я бы понял, но ясно, что время ожидания основано на совокупном времени всех сообщений, находящихся в очереди.
Я довольно новичок в Верблюде, и я ясно что-то неправильно понял с архитектурной точки зрения. Я был бы чрезвычайно признателен за любые указания о том, как остановить эти тайм-ауты. Спасибо за чтение.