Проблема: Как объединить поток, созданный из TOPIC_2 (на шаге 2), в таблицу состояний KTable (на шаге 1 формата).
Цель: после операции соединения, если мы изменим состояние объекта AlarmState (значение KTable stateTable), то же состояние должно быть отражено в stateTable (часть шага 1)
Существует KTable (как stateTable), описанный в шаге 1 (созданный из TOPIC_1)
Существует еще одна тема TOPIC_2, где создаются данные (на шаге 2)
Ключ stateTable и сгенерированные данные в TOPIC_2 совпадают
Шаг1.
final KStream<String, MetricBasicMessage> basicMsgStream = builder.stream("TOPIC_1",
Consumed.with(Serdes.String(), new JSONSerde<>()));
KTable <String, AlarmState> stateTable =
builder.stream("TOPIC_1",Consumed.with(Serdes.String(), new JSONSerde<>()))
.flatMapValues(...)
.filter(...)
.map(...)
.groupByKey(...)
.aggregate(...);
final KafkaStreams streams = new KafkaStreams(builder.build(), <streamsConfiguration>);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Шаг2.
String keyToJoinWithState = key.substring(0, index);
producer.send("TOPIC_2", keyToJoinWithState, new NotificationMessage(taskType, thresh),"NOTIIFCATION_MESSAGE");