всем.
У меня есть структурированный стриминг в Delta Lake. Моя последняя таблица должна подсчитывать, сколько идентификаторов (уникальных) получает доступ к платформе в неделю.
Я группирую данные по неделям в потоковой передаче, однако я не могу подсчитать уникальные значения идентификаторов в другом столбце, и я продолжаю получать счет всей группы, даже если вместо этого повторяется.
Я пробовал сгруппировать данные дважды, по неделям, а затем по идентификатору устройства. Я пробовал dropDuplicate (). Пока ничего не вышло.
Может кто-нибудь объяснить мне, что мне не хватает?
Мой код:
from pyspark.sql.functions import weekofyear, col
def silverToGold(silverPath, goldPath, queryName):
(spark.readStream
.format("delta")
.load(silverPath)
.withColumn("week", weekofyear("client_event_time"))
.groupBy(col("week"))
.count()
.select(col("week"),col("count").alias("WAU"))
.writeStream
.format("delta")
.option("checkpointLocation", goldPath + "/_checkpoint")
.queryName(queryName)
.outputMode("complete")
.start(goldPath))