Это продолжение предыдущего вопроса о переполнении стека
Добавление дополнительной информации для последующего выпуска.
Spring Boot версия 2.2.4 Облачная версия Hoxton.SR1
My Processor App
@SpringBootApplication
public class FunctionStreamSampleApplication {
public static void main(String[] args) {
SpringApplication.run(FunctionStreamSampleApplication.class, args);
}
@Bean
public Function<String, String> messenger() {
return data -> "Hello : " + data.toUpperCase() + "!";
}
}
Конфигурация процессора:
spring:
cloud:
stream:
function:
definition: messenger
bindings:
messenger-in-0: input
messenger-out-0: output
Consumer App
@SpringBootApplication
public class ConsumerStreamSampleApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerStreamSampleApplication.class, args);
}
@Bean
public Consumer<String> sample() {
return data -> System.out.println("Sink : " + data + "!");
}
}
Consumer Config
spring:
cloud:
stream:
function:
definition: sample
bindings:
sample-in-0: input
Я регистрирую приложения Processor и Sink в SCDF
Создать поток как: kafkaQ> customProcessor | customSink
При развертывании потока модуль Sink не запускается. Custom Sink имеет следующее сообщение об ошибке в журнале.
c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'messenger' with acceptedOutputTypes: []
c.f.c.c.BeanFactoryAwareFunctionRegistry : !!! Failed to discover function 'messenger' in function catalog. Function available in catalog are: [sample, functionRouter]
Это же приложение-приемник запускает и потребляет сообщение в журнале о выходе процессора, если я переименую привязку функции потребителя, а также в "messenger"
@Bean
public Consumer<String> messenger() {
return data -> System.out.println("Sink : " + data + "!");
}
Можно ли узнать, Существуют и другие свойства конфигурации, которые необходимо изменить, чтобы у разных процессоров и приемников не было одинаковых имен.