Я использую пружинную интеграцию для обработки файлов. Я хочу читать файл построчно, фильтровать, конвертировать и отправлять в Кафку. Файл передается на обработку через асинхронный шлюз. В конце обработки я добавил агрегатор для 2 целей:
- Я хочу дождаться обработки всего файла. То есть все правильно обработанные строки были отправлены в kafka, и все строки, вызвавшие ошибки, были обработаны потоком, который читает из канала ошибок.
- В качестве возвращаемого значения для шлюза я хочу получить агрегированные результаты для обработки файлов , Сколько строк было прочитано из файла, сколько обработано без ошибок и т. Д. c.
Что я сделал до сих пор, я создал агрегатор, который ожидает файловые маркеры START и END + все обработанные строки , Я создал пользовательскую группу сообщений, которая считает обработанные строки, а также сохраняет replyChannel
, которое берется из файла маркера START, и количество строк, которое берется из файла маркера END. Для группы также установлено время ожидания 5 с. Когда группа завершает, она выдает сообщение со статистикой, которую я хочу, и заголовок replyChannel
со значением, взятым из маркера START. Насколько я понимаю, шлюз ждет ответа на это replyChannel
. Вот код для группы сообщений:
public void add(Message<?> messageToAdd) {
if(messageToAdd.getPayload() instanceof FileMarker) {
FileMarker marker = (FileMarker) messageToAdd.getPayload();
switch (marker.getMark()) {
case START:
replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
break;
case END:
expectedRowCount.set(marker.getLineCount());
break;
default:
throw new IllegalStateException("Unexpected file mark");
}
} else {
ProcessingResult processingResult = messageToAdd.getHeaders()
.get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
assert processingResult != null;
rowCount.incrementAndGet();
switch (processingResult) {
case FILTERED:
filteredCount.incrementAndGet();
break;
case PROCESSED:
processedCount.incrementAndGet();
break;
case PROCESSING_ERROR:
processingErrorCount.incrementAndGet();
break;
case KAFKA_ERROR:
kafkaErrorCount.incrementAndGet();
break;
default:
throw new IllegalStateException("Unrecognized processing result: " + processingResult);
}
}
}
У меня проблема в том, что мой шлюз никогда не получает ответ и ждет бесконечно. Как я могу заставить асинхронный шлюз ждать, пока агрегатор отправит свое сообщение и получить полезную нагрузку этого сообщения в результате обработки шлюза?
Тест, воссоздающий проблему, можно найти здесь https://github.com/hawk1234/spring-integration-example commit 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4 . При запуске тестов журналы приложений доступны по пути build / logs / spring -gration-example.log . В настоящее время тест зависает, так как шлюз никогда не получает ответ. Также весь поток находится в стадии разработки, поэтому групповые релизы только после истечения времени ожидания.