Apache Тайм-аут на верблюде - PullRequest
       66

Apache Тайм-аут на верблюде

0 голосов
/ 15 января 2020

У меня есть два маршрута Camel, настроенных в XML и вставленных ниже: -

Маршрут 1:

<camel:route id="statementsArchivingPollRoute">
    <camel:from uri="timer://tempQueue?fixedRate=true&amp;period=30s&amp;delay=30s"/>
    <camel:transacted ref="PROPAGATION_REQUIRED">
    <camel:process ref="statementsArchivingRequestZipProcessor"/>
    <camel:choice>
        <camel:when>
            <camel:simple>${body.size} >= 1</camel:simple>
            <camel:split>
                <camel:simple>${body}</camel:simple>
                <camel:marshal ref="archiveFileInterfaceMetadataMapper"/>
                <camel:to pattern="InOnly"
                          uri="activemq:{{ccs.activemq.queue.prefix}}.sr.archive.bulk.ingestion.req?jmsMessageType=Text"/>
            </camel:split>
            <camel:log loggingLevel="INFO" message="Archiving content was processed"/>
        </camel:when>
        <camel:otherwise>
            <camel:log loggingLevel="INFO" message="No archiving content to process"/>
        </camel:otherwise>
    </camel:choice>
    </camel:transacted>
</camel:route>

Маршрут 2:

<camel:route id="statementsArchivingBulkIngestionRequestRoute">
    <camel:from uri="activemq:{{ccs.activemq.queue.prefix}}.sr.archive.bulk.ingestion.req"/>
    <camel:throttle timePeriodMillis="4000">
    <camel:constant>1</camel:constant>
    <camel:setExchangePattern pattern="InOnly"/>
    <camel:unmarshal ref="archiveFileInterfaceMetadataMapper"/>
    <camel:bean ref="archiveFileEntryTransformer" method="transform"/>
    <camel:setHeader headerName="CamelHttpMethod">
        <camel:constant>POST</camel:constant>
    </camel:setHeader>
    <camel:toD uri="{{ccs.bulk.ingestion.service.ingest.archive.file}}"/>
    </camel:throttle>
</camel:route>

Процессор по первому маршруту возвращает список объектов запроса. Затем список разделяется, и каждый запрос сортируется и помещается в очередь.

Второй маршрут прослушивает эту очередь. Когда он удаляет сообщение из очереди, он демарширует его, выполняет преобразование, а затем использует его для отправки запроса на публикацию в другую службу. Я ограничиваю этот маршрут, чтобы он обрабатывал только одно сообщение в секунду, чтобы не перегружать нисходящую службу.

Все это прекрасно работает, когда список содержит только несколько запросов и, следовательно, только несколько сообщений попадают в очередь. , но если в списке много элементов, маршрут 2 раза истекает, и появляется следующая запись в журнале:

Atomikos:12] c.a.icatch.imp.ActiveStateHandler        : Timeout/setRollbackOnly of ACTIVE coordinator !

Тайм-аут приводит к повторению процесса, и нисходящая служба заканчивается вызовом несколько раз за сообщение, а не один раз.

Я не могу понять, почему количество вызовов маршрута 2 должно вызывать тайм-аут. Я думал, что один экземпляр маршрута будет запущен для каждого сообщения, находящегося в очереди из activemq. Если отдельное сообщение занимает много времени, то я бы понял, но ясно, что время ожидания основано на совокупном времени всех сообщений, находящихся в очереди.

Я довольно новичок в Верблюде, и я ясно что-то неправильно понял с архитектурной точки зрения. Я был бы чрезвычайно признателен за любые указания о том, как остановить эти тайм-ауты. Спасибо за чтение.

1 Ответ

1 голос
/ 15 января 2020

Джон, вы можете попробовать следующее, чтобы исследовать

  1. отключить / прокомментировать 2-й маршрут. Целью использования activemq было бы сделать процесс асинхронным c, что означает, что 1-й маршрут не должен иметь никакого влияния из-за 2-го маршрута. Если это работает без маршрута 2, то проблема в другом месте.

  2. Если вы обнаружите, что 1-й маршрут работает нормально без 2-го маршрута, то следующим будет попытаться установить меньшее количество потоков во 2-м маршруте. Можно поставить 1 или 2 темы и посмотреть, поможет ли это. Я думаю, что это конфликт ActiveMQ, а не эти конфигурации маршрутов.

  3. Проверьте размер полезной нагрузки, которую вы отправляете в activemq. Если вы публикуете в activemq очень большое сообщение, которое также может повлиять на большое количество элементов, и каждый элемент имеет большой размер, что приводит к конфликту в activemq, и транзакция занимает больше времени, чем время ожидания.

Если вы отправляете большой набор данных в activemq, возможно, вы захотите вернуться к дизайну. Сохраните полезную нагрузку в некоторой персистентной области (db / file / cache) и отправьте события уведомления, содержащие только ссылку на полезную нагрузку и некоторые метаданные. 2-й маршрут может затем взять ссылку из события и извлечь полезную нагрузку из того места, где он был сохранен по маршруту 1.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...