Использование Spring Integration executorChannel с функцией Spring Cloud - PullRequest
0 голосов
/ 13 апреля 2020

Я использую функцию Spring Cloud для обработки данных из kafka с Flux. По умолчанию он обрабатывает данные в потоке-получателе (где используется сообщение). Я собираюсь реализовать пул потоков для обработки и регулирования данных параллельного интерфейса, и в Spring Cloud Integration есть отличная реализация, называемая executorChannel (https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ExecutorChannel.html)

Пример реализации функции:

public static class FN1 implements Function<Flux<String>, Flux<String>> {
  public Flux<String> apply(Flux<String> data) {
    return data
      .map(f ->  doSomething() )      
  }
}

Поэтому я не нашел простого способа соединить функции, реализованные таким образом, через executorChannel.

Мб, есть способ определить тип inputChannel?

UPD: читать комментарии под ответом Олега. Они очень полезны.

1 Ответ

0 голосов
/ 13 апреля 2020

Вы имеете в виду что-то вроде этого?

@SpringBootApplication
public class SampleFunctoinAppApplication  {

    public static void main(String[] args) throws Exception {

        ApplicationContext context = SpringApplication.run(SampleFunctoinAppApplication.class, args);
        SubscribableChannel output = context.getBean("output", SubscribableChannel.class);
        output.subscribe(System.out::println);

        MessageChannel channel = context.getBean("executorChannel", MessageChannel.class);
        channel.send(new GenericMessage<String>("hello"));
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows
                .from("executorChannel")
                .transform(echo())
                .channel("output")
                .get();
    }

    @Bean
    public ExecutorChannel executorChannel() {
        return new ExecutorChannel(Executors.newCachedThreadPool());
    }

    public Function<String, String> echo() {
        return v -> v;
    }
}

Что вы подразумеваете под "определить тип inputChannel"?

...