У меня есть простые @Bean
(Java 8 функций), которые отображаются на адрес назначения topic
(-out
и -in
).
@Bean
public Function<String, String> transform() {
return payload -> payload.toUpperCase();
}
@Bean
public Consumer<String> receive() {
return payload -> logger.info("Data received: " + payload);
}
.yml config :
spring:
cloud:
stream:
function:
definition: transform;receive
bindings:
transform-out-0:
destination: myTopic
receive-in-0:
destination: myTopic
Теперь я хочу вызвать функцию transform
с помощью вызова REST
, чтобы ее вывод шел в destination topic
(т.е. transform-out-0
сопоставлен с myTopic
) и забирается consumer
из этого пункта назначения (receive-in-0
сопоставлен с myTopic
). По сути, каждый вызов REST должен порождать новый экземпляр KAFKA Producer
и закрывать его.
Как я могу этого добиться, используя spring-cloud-stream
?
Спасибо
Ангшуман