Привет, у меня есть 3 потока событий, которые я хочу объединить, используя потоки Kafka.
Я не могу найти простое решение для решения проблемы параллелизма:
// merged values Ktable
KTable<String, ProdForecastPowerPlantAgg> mergedTable = builder.table(
getMergedValuesTopic(),
[...]);
// records A
// stream
KStream<String, RecordA> recordsAStream = builder.stream(
getRecordATopic(),
[...]);
// rekeyed stream
KStream<String, ProductionRecordValue> recordsABySomeId = recordsAStream
.selectKey((k, v) -> getKey(v);
// records B
// stream
KStream<String, RecordB> recordsBStream = builder.stream(
getRecordBTopic(),
[...]);
// rekeyed stream
KStream<String, RecordB> recordsBBySomeId = recordsBStream
.selectKey((k, v) -> getKey(v);
// records C
// stream
KStream<String, RecordA> recordsCStream = builder.stream(
getRecordCTopic(),
[...]);
// rekeyed stream
KStream<String, ProductionRecordValue> recordsCBySomeId = recordsCStream
.selectKey((k, v) -> getKey(v);
// when a recordA arrives
KStream<String, RecordA> aggFromA = recordsABySomeId
.filter((k, v) -> v != null)
// join recordA and current join result together
.leftJoin(mergedTable, (recA, oldMerge) -> {
if (oldMerge != null) {
return new Merge(recA, oldMerge.B, oldMerge.C);
}
return new Merge(recA, null, null)
},
[...]
);
// when a recordB arrives
KStream<String, RecordB> aggFromB = recordsBBySomeId
.filter((k, v) -> v != null)
// join recordB and current join result together
.leftJoin(mergedTable, (recB, oldMerge) -> {
if (oldMerge != null) {
return new Merge(oldMerge.A, recB, oldMerge.C);
}
return new Merge(null, recB, null)
},
[...]
);
// when a recordC arrives
KStream<String, RecordB> aggFromC = recordsCBySomeId
.filter((k, v) -> v != null)
// join recordB and current join result together
.leftJoin(mergedTable, (recC, oldMerge) -> {
if (oldMerge != null) {
return new Merge(oldMerge.A, oldMerge.B, recC);
}
return new Merge(null, null, recC)
},
[...]
);
// save aggreagtion
aggFromA.merge(aggFromB).merge(aggFromC)
.to(getMergedValuesTopic(), Produced.with(Serdes.String(), aggSerdes));
return builder.build();
Действительно, этот фрагмент недействителен: таблица KTable на основе getMergedValuesTopic не отражает последнее состояние слияния , когда поиск выполнен:
когда две разные записи приходят одновременно, одно обновление может отменить другое (так как поиск устарел).
У кого-нибудь есть простое решение этой проблемы с использованием потоков Кафки?