Приложения потокового процессора / приемника SCDF не работают, если имена привязок функций не совпадают - PullRequest
0 голосов
/ 04 апреля 2020

Это продолжение предыдущего вопроса о переполнении стека

Добавление дополнительной информации для последующего выпуска.

Spring Boot версия 2.2.4 Облачная версия Hoxton.SR1

My Processor App

@SpringBootApplication
public class FunctionStreamSampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(FunctionStreamSampleApplication.class, args);
    }

    @Bean
    public Function<String, String> messenger() {
        return data -> "Hello : " + data.toUpperCase() + "!";
    }

}

Конфигурация процессора:

spring:
  cloud:
    stream:
      function:
        definition: messenger
        bindings:
          messenger-in-0: input
          messenger-out-0: output

Consumer App

@SpringBootApplication
public class ConsumerStreamSampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerStreamSampleApplication.class, args);
    }

    @Bean
    public Consumer<String> sample() {
         return data -> System.out.println("Sink : " + data + "!");
    }

}

Consumer Config

spring:
  cloud:
    stream:
      function:
        definition: sample
        bindings:
          sample-in-0: input

Я регистрирую приложения Processor и Sink в SCDF

Создать поток как: kafkaQ> customProcessor | customSink

При развертывании потока модуль Sink не запускается. Custom Sink имеет следующее сообщение об ошибке в журнале.

c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'messenger' with acceptedOutputTypes: []
c.f.c.c.BeanFactoryAwareFunctionRegistry : !!! Failed to discover function 'messenger' in function catalog. Function available in catalog are: [sample, functionRouter]

Это же приложение-приемник запускает и потребляет сообщение в журнале о выходе процессора, если я переименую привязку функции потребителя, а также в "messenger"

@Bean
    public Consumer<String> messenger() {
         return data -> System.out.println("Sink : " + data + "!");
    }

Можно ли узнать, Существуют и другие свойства конфигурации, которые необходимо изменить, чтобы у разных процессоров и приемников не было одинаковых имен.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...