Мне нужно сделать несколько агрегаций при потоковой передаче данных из Kafka и выводить первые 10 строк результатов на консоль каждые M секунд.
input_df = (
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", "page_views")
.load()
.selectExpr('cast(value as string)')
)
...
...
# info has 2 cols: domain, uid (info = transformation of input_df)
# It's an example of what I want to do (like in simple pyspark)
stat = (
info
.groupby('domain')
.agg(
F.count(F.col('UID')).alias('view'),
F.countDistinct(F.col('UID')).alias('unique')
)
.sort(F.col("view").desc())
.limit(10)
)
query = (
stat
.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "true")
.start()
)
Этот пример без триггера времени, но я могу сделать это сам. Из-за того, что не разрешено использовать countDistinct, у меня нет идей для выполнения моего упражнения. Я попытался сделать 2 dfs для каждой агрегации (df_1 = (домен, представление), df_2 = (домен, уникальный)) и затем присоединиться к df_1 с df_2, но также не разрешено иметь несколько агрегаций. Так что это тупик для меня. Будет круто иметь решение за это.
Спасибо за внимание!