Структурированная потоковая пользовательская дедупликация - PullRequest
0 голосов
/ 19 сентября 2018

У меня есть потоковые данные, поступающие из кафки в dataFrame.Я хочу удалить дубликаты на основе идентификатора и сохранить последние записи на основе отметки времени.

Пример данных выглядит следующим образом:

Id  Name    count   timestamp
1   Vikas   20      2018-09-19T10:10:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   30      2018-09-19T10:10:30
4   Vishal  10      2018-09-19T10:10:40
1   Vikas   50      2018-09-19T10:10:50
4   Vishal  40      2018-09-19T10:11:00
1   Vikas   10      2018-09-19T10:11:10
3   Vilas   20      2018-09-19T10:11:20

Ожидаемый вывод будет:

Id  Name    count   timestamp
1   Vikas   10      2018-09-19T10:11:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   20      2018-09-19T10:11:20
4   Vishal  40      2018-09-19T10:11:00

Старые дубликаты удаляются, и только последние записи сохраняются на основе поля отметки времени.

Я использую водяные знаки для поля отметки времени.Я попытался использовать «df.removeDuplicate», но он сохраняет старые записи без изменений, и все новое удаляется.

Текущий код выглядит следующим образом:

df = df.withWatermark("timestamp", "1 Day").dropDuplicates("Id", "timestamp")

Как мы можем реализовать пользовательский метод дедупликации такчто мы можем хранить последние записи как уникальные записи?

Любая помощь приветствуется.

1 Ответ

0 голосов
/ 19 сентября 2018

Прежде чем отбрасывать дубликаты, сначала отсортируйте столбец отметки времени.

df.withWatermark("timestamp", "1 Day")
  .sort($"timestamp".desc)
  .dropDuplicates("Id", "timestamp")
...