ConcurrentModificationException в DefaultExchangeFormatter при входе в Apache Camel - PullRequest
1 голос
/ 18 мая 2019

У меня есть верблюжий маршрут, который регистрирует информацию о заголовках обмена, и иногда это вызывает исключение ConcurrentModificationException в org.apache.camel.processor.DefaultExchangeFormatter.format(DefaultExchangeFormatter.java:120). В проекте я использую org.apache.camel:camel-cxf:2.16.5. Я запускаю его на apache-servicemix: 6.1.4

Я обнаружил похожую ошибку: https://issues.apache.org/jira/browse/CAMEL-8829, но ее следует устранить в версии 2.16.

Ошибка трассировки стека:

2019-04-17 15:13:05,489 | ERROR | ead #241 - Split | ReportError                      | 151 - org.apache.camel.camel-core - 2.16.5 | java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
    at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
    at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
    at java.util.AbstractMap.toString(AbstractMap.java:554)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractMap.toString(AbstractMap.java:559)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at org.apache.camel.processor.DefaultExchangeFormatter.format(DefaultExchangeFormatter.java:120)
    at org.apache.camel.processor.CamelLogProcessor.process(CamelLogProcessor.java:76)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)
    at org.apache.camel.processor.CamelLogProcessor.process(CamelLogProcessor.java:71)
    at org.apache.camel.component.log.LogProducer.process(LogProducer.java:39)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:62)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
    at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)
    at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:823)
    at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:84)
    at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:319)
    at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Основной маршрут посредника сообщений - это делегирование операций другому модулю:

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <propertyPlaceholder id="properties" location="ref:preProps" />

        <threadPool id="mbkPool" threadName="mbkPool" poolSize="20" maxPoolSize="100" keepAliveTime="180" />

        <onException>
            <exception>java.lang.Exception</exception>
            <log message="Error in MBK. ${property.CamelExceptionCaught} ${exception.message}" loggingLevel="ERROR"
                 logName="log:error" />
            <log message="Stacktrace: ${exception.stacktrace}" loggingLevel="ERROR" logName="log:error" />
        </onException>

        <route streamCache="true">
        <from uri="cxf:bean:queryServiceEndpoint" />
            <convertBodyTo type="a.b.c.Body"/>
            <process ref="aProcessor" />
            <process ref="sendProcessor" />
            <log message="Invoking for ${property.invokeSystems}" logName="BeforeInvoke" loggingLevel="DEBUG" />
            <to uri="log:BeforeInvoke?level=DEBUG&amp;showAll=true" />
            <recipientList ignoreInvalidEndpoints="true" delimiter="," executorServiceRef="mbkPool" parallelProcessing="true">
                <exchangeProperty>invokeSystems</exchangeProperty>
            </recipientList>
            <process ref="responseQueryProcessor" />
            <to uri="log:AfterInvoke?level=DEBUG" />
        </route>
    </camelContext>

Модуль, который выдает ошибку:

<route streamCache="true">
            <from uri="vm:invoke?concurrentConsumers={{concurrentConsumers}}" />
            <to uri="log:BeginPartialQuery?level=DEBUG" />
            <split parallelProcessing="true">
                <log message="queryType = OBJECT" loggingLevel="DEBUG" logName="log:ObjectOperation" />
                <to uri="direct:processCepSyncOperation" />
            </split>
        </route>
        <route streamCache="true">
            <from uri="direct:processSyncOperation" />
            <to uri="log:BeginProcessLevel=DEBUG" />
            <setProperty propertyName="operation">
                <xpath resultType="java.lang.String">/operation</xpath>
            </setProperty>
            <setBody>
                <simple>${property.bodyHolder}</simple>
            </setBody>
            <to uri="log:ProcessSyncOperation?level=DEBUG&amp;showHeaders=true" />
            <to uri="xslt:aaaa.xslt" />
            <to uri="log:afterXslt?level=DEBUG" />
            <to uri="direct:processRequest" />
        </route>

Я обнаружил, что, вероятно, <to uri="log:ProcessSyncOperation?level=DEBUG&amp;showHeaders=true" /> строка вызывает ошибку.

Можно ли обойти эту проблему? (Я знаю, что могу удалить этот журнал, но, возможно, есть другие варианты)

...