Так что я не знаю, как это сделать. Я сделал это с помощью Quarkus и среды обмена сообщениями MicroProfile Reactive Messaging и библиотеки javax.websocket, но я не уверен, как я могу перенести это на использование Kafka Streams. С помощью MP Reactive Messaging я могу просто иметь аннотацию @Outgoing на канале в одном из моих других классов, а затем с моей службой WebSocket я могу вводить из этого канала вот так.
@ServerEndpoint("/validatedmessages")
@ApplicationScoped
public class WebSocket {
@Inject @Channel("post-final-check") Flowable<CustomMessage> finalizedMessages;
//private Jsonb jsonb;
ObjectMapper obj = new ObjectMapper();
private static final Logger LOGGER = Logger.getLogger(WebSocket.class);
private List<Session> sessions = new CopyOnWriteArrayList<>();
private Disposable subscription;
@OnOpen
public void onOpen(Session session) {
sessions.add(session);
}
@OnClose
public void onClose(Session session) {
sessions.remove(session);
}
@PostConstruct
public void subscribe() {
subscription = finalizedMessages.subscribe(message -> sessions.forEach(session -> {
try {
write(session, message);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}));
}
@PreDestroy
public void cleanup() throws Exception {
subscription.dispose();
//jsonb.close();
}
Возможно ли это сделать это с Kafka Streams?