После присоединения тема содержит повторяющиеся сообщения - PullRequest
0 голосов
/ 01 мая 2019

Попытка объединить два потока по полю временной отметки в полезной нагрузке сообщений приводит к дублированию сообщений в теме вывода.Этого не ожидал.Как я могу избежать этого дублирования?

Я использую топологию DSL, которая использует два потока из двух тем.Поскольку я делаю некоторые сопоставления для каждого потока, это приводит к двум дополнительным темам.Наконец, после объединения пятая тема заполняется результатами, и эта тема показывает дублированные сообщения.Я проверил, что остальные четыре темы не содержат дубликатов.Я также заметил, что функция, которую я предоставляю функции соединения потоков kafka, вызывается неоднократно.Эта функция уже показывает, что происходит дублирование.

KStream<String, MappedOriginalSensorData> flattenedOriginalData = originalData
                .flatMap(flattenOriginalData())
                .through("mapped-original-sensor-data", Produced.with(Serdes.String(), new MappedOriginalSensorDataSerde()));

        KStream<String, MappedErrorScoreData> enrichedErrorData = errorScoreData
                .map(enrichWithModelAndAlgorithmAndReduceKey())
                .through("mapped-error-score-data-repartition", Produced.with(Serdes.String(), new MappedErrorScoreDataSerde()));


        return enrichedErrorData
                //#3. Join
                .join(flattenedOriginalData, join(),
                        JoinWindows.of(Duration.ofMillis(1).toMillis()), Joined.with(Serdes.String(), new MappedErrorScoreDataSerde(), new MappedOriginalSensorDataSerde()))
                //#4. set key
                .selectKey((k,v) -> v.getOriginalKey())
                //#5. Map removing the originalKey field)
                .mapValues(removeOriginalKeyField())
                .through("joined-data-repartition");

Я ожидал, что в разделе «Перераспределение объединенных данных» будут показаны только уникальные сообщения на основе полезной нагрузки:

{
  "timestamp": 1556626280000,
  "errorSignal": 84.98,
  "originalSignal": 36
}
Key:
1234:a:v2:nord::TE7
Timestamp:
Apr 30th, 2019 14:11:20.00
Offset:
3629
Partition:
0

и

{
  "timestamp": 1556626280000,
  "errorSignal": 84.98,
  "originalSignal": 36
}
Key:
1234:a:v2:nord::TE7
Timestamp:
Apr 30th, 2019 14:11:20.00
Offset:
3628
Partition:
0

см. Смещение

1 Ответ

0 голосов
/ 03 мая 2019

Я обнаружил, что некоторые оставшиеся государственные хранилища на моем локальном диске вызвали дублирование сообщений.То есть я несколько раз повторял свои тесты, стирая хранилище kafka, но не знал о хранилищах состояний на локальном диске.Каждый раз, когда я повторял тестирование (с теми же сообщениями) сообщений, которые были объединены с содержимым в государственных хранилищах, что вызывало дублирование сообщений.

...