Отправка сообщения из Kafka Streams в WebSocket с помощью Quarkus - PullRequest
1 голос
/ 13 июля 2020

Так что я не знаю, как это сделать. Я сделал это с помощью Quarkus и среды обмена сообщениями MicroProfile Reactive Messaging и библиотеки javax.websocket, но я не уверен, как я могу перенести это на использование Kafka Streams. С помощью MP Reactive Messaging я могу просто иметь аннотацию @Outgoing на канале в одном из моих других классов, а затем с моей службой WebSocket я могу вводить из этого канала вот так.

@ServerEndpoint("/validatedmessages")
@ApplicationScoped
public class WebSocket {

@Inject @Channel("post-final-check") Flowable<CustomMessage> finalizedMessages;
//private Jsonb jsonb;
ObjectMapper obj = new ObjectMapper(); 

private static final Logger LOGGER = Logger.getLogger(WebSocket.class);

private List<Session> sessions = new CopyOnWriteArrayList<>();
private Disposable subscription;

@OnOpen
public void onOpen(Session session) {
    sessions.add(session);
}

@OnClose
public void onClose(Session session) {
    sessions.remove(session);
}

@PostConstruct
public void subscribe() {
    subscription = finalizedMessages.subscribe(message -> sessions.forEach(session -> {
        try {
            write(session, message);
        } catch (JsonProcessingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }));
}

@PreDestroy
public void cleanup() throws Exception {
    subscription.dispose();
    //jsonb.close();
}

Возможно ли это сделать это с Kafka Streams?

1 Ответ

0 голосов
/ 17 июля 2020

Где вы храните результаты работы Kafka Streams? В топиках или KTable?

Если он первый, то вы можете ввести Publisher или использовать @Incoming (topi c). В противном случае вам необходимо использовать InteractiveQueries, см. Здесь пример:

https://quarkus.io/guides/kafka-streams#interactive -запросы

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...