Splitter прерывается во время исключения без обработки последующих сообщений - PullRequest
0 голосов
/ 08 мая 2018

У меня есть требование разделять сообщения и обрабатывать их по одному.Если какое-либо из сообщений не сработало, я хотел бы сообщить об этом на канал ошибок и возобновить обработку следующих доступных сообщений

Я использую Spring Cloud AWS Stream Starter с 1.0.0-SNAPSHOT

Iнаписал пример программы с использованием сплиттера

     @Bean
      public MessageChannel channelSplitOne() {
        return new DirectChannel();
      }


      @StreamListener(INTERNAL_CHANNEL)
      public void channelOne(String message) {
        if (message.equals("l")) {
          throw new RuntimeException("Error due to l");
        }
        System.out.println("Internal: " + message);
      }


      @Splitter(inputChannel = Sink.INPUT, outputChannel = INTERNAL_CHANNEL)
      public List<Message> extractItems(Message<String> input) {
        return Arrays.stream(input.getPayload().split(""))
            .map(s -> MessageBuilder.withPayload(s).copyHeaders(input.getHeaders()).build())
            .collect(Collectors.toList());
      }

Когда я отправляю сообщение как Hello, ожидается, что 'h', 'e', ​​'o' будут обработаны, но 'l' будет сообщено какошибка.

Но здесь после 'l' обработка не возобновляется.

Есть ли способ добиться этого.

1 Ответ

0 голосов
/ 08 мая 2018

Вы можете сделать это, но с @ServiceActivator вместо @StreamListener. У первого есть опция adviceChain, где вы можете ввести ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice.

Проблема в том, что сплиттер похож на обычный цикл в Java, поэтому для продолжения после ошибки нам нужно как-то добавить попытку ... поймать там. Но это уже не ответственность сплиттера. Поэтому мы должны перенести такую ​​логику туда, где у нас есть проблема с ошибкой.

...