При использовании KTable потоки Kafka не позволяют экземплярам читать из нескольких разделов определенной темы, когда количество экземпляров / потребителей равно количеству разделов.Я пытался добиться этого с помощью GlobalKTable, проблема в том, что данные будут перезаписаны, к ним также нельзя применить агрегирование.
Давайте предположим, что у меня есть тема с именем «data_in» с 3 разделами (P1, P2,Р3).Когда я запускаю 3 экземпляра (I1, I2, I3) потокового приложения Kafka, я хочу, чтобы каждый экземпляр считывал данные со всех разделов «data_in».Я имею в виду, что I1 может читать из P1, P2 и P3, I2 может читать из P1, P2 и P3, I2 и так далее.
РЕДАКТИРОВАТЬ: Имейте в виду, что производитель может опубликовать двапохожие идентификаторы на два разных раздела в data_in.Поэтому при запуске двух разных экземпляров GlobalKtable будет перезаписан.
Пожалуйста, как этого добиться?Это часть моего кода
private KTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = trashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}