Как объединить поток, созданный из одной темы, в производную KTable (как агрегатную операцию) из другой темы - PullRequest
0 голосов
/ 17 апреля 2019

Проблема: Как объединить поток, созданный из TOPIC_2 (на шаге 2), в таблицу состояний KTable (на шаге 1 формата).

Цель: после операции соединения, если мы изменим состояние объекта AlarmState (значение KTable stateTable), то же состояние должно быть отражено в stateTable (часть шага 1)

Существует KTable (как stateTable), описанный в шаге 1 (созданный из TOPIC_1) Существует еще одна тема TOPIC_2, где создаются данные (на шаге 2) Ключ stateTable и сгенерированные данные в TOPIC_2 совпадают

Шаг1.

final KStream<String, MetricBasicMessage> basicMsgStream = builder.stream("TOPIC_1",
                Consumed.with(Serdes.String(), new JSONSerde<>()));

KTable <String, AlarmState> stateTable = 
         builder.stream("TOPIC_1",Consumed.with(Serdes.String(), new JSONSerde<>()))
                .flatMapValues(...)
                .filter(...)
                .map(...)
                .groupByKey(...)
                .aggregate(...);

final KafkaStreams streams = new KafkaStreams(builder.build(), <streamsConfiguration>);
        streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Шаг2.

String keyToJoinWithState = key.substring(0, index);

producer.send("TOPIC_2", keyToJoinWithState, new NotificationMessage(taskType, thresh),"NOTIIFCATION_MESSAGE");

1 Ответ

1 голос
/ 17 апреля 2019

Если вы хотите присоединиться к потоку с какой-то таблицей, вам нужно просто позвонить

KStream::join(final KTable<K, VT> table, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);

Это будет примерно так:

KStream<String, String> stream2 = builder.<String, NotificationMessage >stream("TOPIC_2", Consumed.with(Serdes.String(), new NotificationMessageSerdes()));
stream2.join(stateTable, (v1, v2) -> ??? /* How to join values from Stream and KTable */).to("output2");
...