Интеграция Spring заставляет асинхронный шлюз ожидать результата агрегации - PullRequest
0 голосов
/ 20 марта 2020

Я использую пружинную интеграцию для обработки файлов. Я хочу читать файл построчно, фильтровать, конвертировать и отправлять в Кафку. Файл передается на обработку через асинхронный шлюз. В конце обработки я добавил агрегатор для 2 целей:

  • Я хочу дождаться обработки всего файла. То есть все правильно обработанные строки были отправлены в kafka, и все строки, вызвавшие ошибки, были обработаны потоком, который читает из канала ошибок.
  • В качестве возвращаемого значения для шлюза я хочу получить агрегированные результаты для обработки файлов , Сколько строк было прочитано из файла, сколько обработано без ошибок и т. Д. c.

Что я сделал до сих пор, я создал агрегатор, который ожидает файловые маркеры START и END + все обработанные строки , Я создал пользовательскую группу сообщений, которая считает обработанные строки, а также сохраняет replyChannel, которое берется из файла маркера START, и количество строк, которое берется из файла маркера END. Для группы также установлено время ожидания 5 с. Когда группа завершает, она выдает сообщение со статистикой, которую я хочу, и заголовок replyChannel со значением, взятым из маркера START. Насколько я понимаю, шлюз ждет ответа на это replyChannel. Вот код для группы сообщений:

public void add(Message<?> messageToAdd) {
        if(messageToAdd.getPayload() instanceof FileMarker) {
            FileMarker marker = (FileMarker) messageToAdd.getPayload();
            switch (marker.getMark()) {
                case START:
                    replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
                    break;
                case END:
                    expectedRowCount.set(marker.getLineCount());
                    break;
                default:
                    throw new IllegalStateException("Unexpected file mark");
            }
        } else {
            ProcessingResult processingResult = messageToAdd.getHeaders()
                    .get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
            assert processingResult != null;
            rowCount.incrementAndGet();
            switch (processingResult) {
                case FILTERED:
                    filteredCount.incrementAndGet();
                    break;
                case PROCESSED:
                    processedCount.incrementAndGet();
                    break;
                case PROCESSING_ERROR:
                    processingErrorCount.incrementAndGet();
                    break;
                case KAFKA_ERROR:
                    kafkaErrorCount.incrementAndGet();
                    break;
                default:
                    throw new IllegalStateException("Unrecognized processing result: " + processingResult);
            }
        }
    }

У меня проблема в том, что мой шлюз никогда не получает ответ и ждет бесконечно. Как я могу заставить асинхронный шлюз ждать, пока агрегатор отправит свое сообщение и получить полезную нагрузку этого сообщения в результате обработки шлюза?

Тест, воссоздающий проблему, можно найти здесь https://github.com/hawk1234/spring-integration-example commit 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4 . При запуске тестов журналы приложений доступны по пути build / logs / spring -gration-example.log . В настоящее время тест зависает, так как шлюз никогда не получает ответ. Также весь поток находится в стадии разработки, поэтому групповые релизы только после истечения времени ожидания.

Ответы [ 2 ]

0 голосов
/ 23 марта 2020

Мне удалось решить проблему. Мне пришлось вручную отправить агрегированное сообщение на MessageHeaders.REPLY_CHANNEL, чтобы сделать это, я использовал метод BaseIntegrationFlowDefinition#logAndReply. Исправление, как и законченный пример, доступно с коммитом eefd5f472891ded39f363ff71ec530ada800f704 .

0 голосов
/ 20 марта 2020

Я создал пользовательскую группу сообщений, которая

Это что-то не так.

Вы должны сделать все необходимые логи c, когда у вас уже есть все сообщения (включая FileMarker) в группе, в стандартной группе. Существует MessageGroupProcessor, который вызывается, когда ReleaseStrategy возвращает true. Итак, здесь у вас есть все сообщения для группы, и вы можете выполнить весь необходимый сбор статистики для получения желаемого ответа.

Ваше приложение слишком сложное, чтобы его быстро переварить. Что я вижу, что вы не отправляете ошибки агрегатору. И это должно быть где-то в верхнем потоке, а не только в подпотоке FileMarker. Я имею в виду, что было бы лучше иметь агрегатор в отдельном потоке верхнего уровня и действительно отправлять FileMarker непосредственно в него, а остальные из sendSuccessChannel(), а также после ошибок обработки.

...