Spark Структурированная потоковая передача - dropDuplicates с альтернативным решением водяного знака - PullRequest
0 голосов
/ 05 декабря 2018

Я пытаюсь дедуплицировать при потоковой передаче данных, используя функцию dropDuplicate с водяным знаком.Проблема, с которой я сталкиваюсь в настоящее время, заключается в том, что для данной записи у меня есть две отметки времени

  1. Одна - это отметка времени события - отметка времени создания записи из источника.
  2. Другая - это передачаtimestamp - временная метка из промежуточного процесса, который отвечает за потоковую передачу данных.

Дубликаты вводятся на промежуточном этапе, поэтому для данного дубликата записи временная метка события такая же, а временная метка передачи отличается.

Для водяного знака мне нравится использовать отметку времени передачи, потому что я знаю, что дубликаты не могут появляться более 3 минут друг от друга при передаче.Но я не могу использовать его в dropDuplicate, потому что он не будет захватывать дубликаты, поскольку дубликаты имеют разную метку времени передачи.

Вот пример,

Event 1:{ "EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:05:00.00" }
Event 2 (duplicate): {"EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:08:00.00"}

В этом случае дубликат был создан во времяпередача через 3 минуты от исходного события

Мой код подобен приведенному ниже,

streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring","transferTimestamp");

Приведенный выше код не удалит дубликаты, так как TransferTimestamp уникален для события и его дубликата.Но в настоящее время это единственный способ, так как Spark заставляет меня включать столбец водяных знаков в функцию dropDuplicates, когда установлен водяной знак.

Мне бы очень хотелось увидеть реализацию dropDuplicate, подобную приведенной ниже, которая была бы верным случаем длялюбые хотя бы один раз потоки семантики, где мне не нужно использовать поле водяного знака в dropDuplicates, и тем не менее соблюдение состояния на основе водяного знака выполняется.Но в настоящее время дело обстоит иначе

streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring");

Я не могу использовать метку timetime, поскольку она не упорядочена, а диапазон времени сильно меняется (отложенные события и нежелательные события).

Если у кого-то есть альтернативаРешение или идеи для дедупликации в таком сценарии, пожалуйста, дайте мне знать.

...