Это продолжение Чтение из ВСЕХ разделов в каждом экземпляре приложения
Что я пытаюсь достичь, это прочитать данные со всех разделов темы в каждомпример.Я попытался использовать GlobalKtable, который должен решить проблему.К сожалению, производитель может опубликовать два одинаковых идентификатора в двух разных разделах в data_in.Поэтому при запуске двух разных экземпляров GlobalKtable будет перезаписан.
Мой вопрос: как этого достичь?Можно ли предотвратить появление идентичных идентификаторов в разных разделах?Или, по крайней мере, как предотвратить их появление в двух разных случаях (я слышал о теме перераспределения после 1-го KStream в приведенном ниже коде)
Пожалуйста, поделитесь некоторыми отправными точками (идеи, шаги, учебник, кодфрагменты, ...)
фрагмент кода
private GlobalKTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = trashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized)
.to("agg_data_in");
return getBuilder().globalTable("agg_data_in");
}