У меня есть потоковые данные, поступающие из кафки в dataFrame.Я хочу удалить дубликаты на основе идентификатора и сохранить последние записи на основе отметки времени.
Пример данных выглядит следующим образом:
Id Name count timestamp
1 Vikas 20 2018-09-19T10:10:10
2 Vijay 50 2018-09-19T10:10:20
3 Vilas 30 2018-09-19T10:10:30
4 Vishal 10 2018-09-19T10:10:40
1 Vikas 50 2018-09-19T10:10:50
4 Vishal 40 2018-09-19T10:11:00
1 Vikas 10 2018-09-19T10:11:10
3 Vilas 20 2018-09-19T10:11:20
Ожидаемый вывод будет:
Id Name count timestamp
1 Vikas 10 2018-09-19T10:11:10
2 Vijay 50 2018-09-19T10:10:20
3 Vilas 20 2018-09-19T10:11:20
4 Vishal 40 2018-09-19T10:11:00
Старые дубликаты удаляются, и только последние записи сохраняются на основе поля отметки времени.
Я использую водяные знаки для поля отметки времени.Я попытался использовать «df.removeDuplicate», но он сохраняет старые записи без изменений, и все новое удаляется.
Текущий код выглядит следующим образом:
df = df.withWatermark("timestamp", "1 Day").dropDuplicates("Id", "timestamp")
Как мы можем реализовать пользовательский метод дедупликации такчто мы можем хранить последние записи как уникальные записи?
Любая помощь приветствуется.