Удалять дубликаты с течением времени в pyspark - PullRequest
0 голосов
/ 21 апреля 2020

У меня есть фрейм потоковых данных в режиме точного чтения из kafka topi c, и я хочу отбрасывать дубликаты в течение последних 5 минут каждый раз, когда анализируется новая запись.

Мне известно о dropDuplicates(["uid"]) функция, я просто не уверен, как проверить наличие дубликатов за указанный c исторический c интервал времени.

Насколько я понимаю, следующее:

df = df.dropDuplicates(["uid"])

либо работает с данными, считанными из текущей (микро) партии, или же с «чем угодно», находящимся сейчас в памяти. Есть ли способ установить время для этой дедупликации, используя столбец "timestamp" в данных?

Заранее спасибо.

1 Ответ

1 голос
/ 21 апреля 2020
df\
  .withWatermark("event_time", "5 seconds")\
  .dropDuplicates(["User", "uid"])\
  .groupBy("User")\
  .count()\
  .writeStream\
  .queryName("pydeduplicated")\
  .format("memory")\
  .outputMode("complete")\
  .start()

для получения дополнительной информации вы можете обратиться, https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html

...