Я пытаюсь создать простое приложение облачного потока с привязкой kafka.Позвольте мне описать установку.1. У меня есть продюсер по теме topic_1
.
2. Есть связыватель потока, связывающий topic_1
после некоторой обработки в topic_2
.
@StreamListener(MyBinder.INPUT)
@SendTo(MyBinder.OUTPUT_2)
public String handleIncomingMsgs(String s) {
logger.info(s); // prints all the messages
return s;
}
Когда производитель создает сообщения,
StreamListner handleIncomingMsgs
получает все сообщения. После получения он должен переслать сообщения на другой канал.
@Service
@EnableBinding(MyBinder.class)
public class LogMsg {
@StreamListener(MyBinder.OUTPUT_2)
public void handle(String board) {
logger.info("Received payload: " + board); //prints every alternate messages
}
Вот мой переплет
public interface ViewsStreams {
String INPUT = "input";
String OUTPUT_1 = "output_1";
String OP_USERS = "output_2";
@Autowired
@Input(INPUT)
SubscribableChannel job_board_views();
@Autowired
@Output(OUTPUT_1)
MessageChannel outboundJobBoards();
@Autowired
@Output(OUTPUT_2)
MessageChannel outboundUsers();
}
Я новичок в этих технологиях.Невозможно понять, что здесь происходит не так.Может кто-нибудь помочь, пожалуйста?