Попытка объединить два потока по полю временной отметки в полезной нагрузке сообщений приводит к дублированию сообщений в теме вывода.Этого не ожидал.Как я могу избежать этого дублирования?
Я использую топологию DSL, которая использует два потока из двух тем.Поскольку я делаю некоторые сопоставления для каждого потока, это приводит к двум дополнительным темам.Наконец, после объединения пятая тема заполняется результатами, и эта тема показывает дублированные сообщения.Я проверил, что остальные четыре темы не содержат дубликатов.Я также заметил, что функция, которую я предоставляю функции соединения потоков kafka, вызывается неоднократно.Эта функция уже показывает, что происходит дублирование.
KStream<String, MappedOriginalSensorData> flattenedOriginalData = originalData
.flatMap(flattenOriginalData())
.through("mapped-original-sensor-data", Produced.with(Serdes.String(), new MappedOriginalSensorDataSerde()));
KStream<String, MappedErrorScoreData> enrichedErrorData = errorScoreData
.map(enrichWithModelAndAlgorithmAndReduceKey())
.through("mapped-error-score-data-repartition", Produced.with(Serdes.String(), new MappedErrorScoreDataSerde()));
return enrichedErrorData
//#3. Join
.join(flattenedOriginalData, join(),
JoinWindows.of(Duration.ofMillis(1).toMillis()), Joined.with(Serdes.String(), new MappedErrorScoreDataSerde(), new MappedOriginalSensorDataSerde()))
//#4. set key
.selectKey((k,v) -> v.getOriginalKey())
//#5. Map removing the originalKey field)
.mapValues(removeOriginalKeyField())
.through("joined-data-repartition");
Я ожидал, что в разделе «Перераспределение объединенных данных» будут показаны только уникальные сообщения на основе полезной нагрузки:
{
"timestamp": 1556626280000,
"errorSignal": 84.98,
"originalSignal": 36
}
Key:
1234:a:v2:nord::TE7
Timestamp:
Apr 30th, 2019 14:11:20.00
Offset:
3629
Partition:
0
и
{
"timestamp": 1556626280000,
"errorSignal": 84.98,
"originalSignal": 36
}
Key:
1234:a:v2:nord::TE7
Timestamp:
Apr 30th, 2019 14:11:20.00
Offset:
3628
Partition:
0
см. Смещение