Я использую компонент Apache Camel File для чтения из локального каталога и загрузки в корзину AWS S3.Этот маршрут работал безупречно в прошлом, но в настоящее время возникают проблемы с некоторыми файлами.
В моих исследованиях отладки до сих пор (3 дня страданий) я обнаружил, что маршрут достигает класса DelegateSyncProcessor
и не имеет каких-либо исключений на обмене (для справки см. Соответствующий кодранее упомянутый класс ниже).Учитывая, что нет никаких исключений, я не могу понять, почему методы onCompletion()
никогда не запускаются для определенных файлов.Не было / не было никаких исключений для проблемных файлов;однако файл и блокировка продолжают существовать после выполнения всей логики (включая окончательную передачу сообщения на конечную точку .to()
), намекая на наличие некоторой внутренней проблемы с верблюдом.Я подозреваю это, потому что маршрут ведет себя нормально в каждом аспекте, кроме удаления файла и блокировки.
После включения журналов отладки для Camel, я был разочарован тем, что не было никаких журналов, связанных сошибка при выполнении маршрута.Я хотел бы услышать любой совет о том, что может происходить под капотом.
Несколько дополнительных замечаний:
- Я бегу на верблюде 2.16.0
- Во время отладки верблюжьего кода не было обнаружено легко обнаруживаемых проблем.
- Логика в разделах
.process()
работает полностью (некоторые исключения возникали, но они правильно обрабатываются.
Обновление:
Больше отладки с включенными журналами отладки для верблюда не дало никаких результатов. Я только обнаружил, что логика в моем коде .process()
является проблемой. Когда код занимает слишком много времени длявыполнить (даже без исключений), удаление файла завершится неудачей.
Обновление 2:
Я обнаружил, где верблюжий маршрут фактически разваливается. Класс CamelEventLogger.java
полностью терпит неудачу при попытке"logEvent". Связанный код с этим классом добавляется ниже. Когда код достигает matcher.find()
, время ожидания истекает.
MyRouteClass.java:
from(importProcessingEndpoint)
.convertBodyTo(byte[].class)
.process((exchange) -> {
// some logic here
})
.to(outgoingEndpoint)
.threads(MAX_NUMBER_OF_CAMEL_THREADS)
.process((exchange) -> {
// some logic here
})
.log("Finished processing import file.")
.to(outgoingEndpoint);
DelegateSyncProcessor.java:
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// force calling the sync method
try {
processor.process(exchange);
} catch (Throwable e) {
// must catch throwable so we catch all
exchange.setException(e);
} finally {
// we are bridging a sync processor as async so callback with true
callback.done(true);
}
return true;
}
CamelEventLogger.java
private void logEvent(final String label, final Exchange exchange, final Endpoint endpoint,
final long elapsedTime, final boolean logTID) {
Matcher matcher = PATTERN.matcher(extractMessage(exchange));
if (matcher.find()) {
//CHECKSTYLE:OFF
String evtType = matcher.group(1);
String evtName = matcher.group(2);
String guid = matcher.group(3);
String tid = matcher.group(4);
//CHECKSTYLE:ON
if (tid == null || !logTID) {
tid = "";
} else {
tid = "intuit_tid=" + tid;
}
log.info(LOG_FORMAT, label, exchange.getExchangeId(), guid, evtName, evtType, endpoint, elapsedTime, tid);
} else { // the message is not parseable, fall back to minimum log entry
log.info("Event {} id={} {} elapsedTimeMs={}", label, exchange.getExchangeId(), endpoint, elapsedTime);
}
}