Множественные агрегации и отличная функция в Spark структурированной потоковой передаче - PullRequest
0 голосов
/ 21 февраля 2020

Мне нужно сделать несколько агрегаций при потоковой передаче данных из Kafka и выводить первые 10 строк результатов на консоль каждые M секунд.

    input_df = (
       spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", brokers)
       .option("subscribe", "page_views")
       .load()
       .selectExpr('cast(value as string)')
    )

    ...
    ...

    # info has 2 cols: domain, uid  (info = transformation of input_df)
    # It's an example of what I want to do (like in simple pyspark)
    stat = (
        info
        .groupby('domain')
        .agg(
             F.count(F.col('UID')).alias('view'),
             F.countDistinct(F.col('UID')).alias('unique')
        )
        .sort(F.col("view").desc())
        .limit(10)
    )

    query = (
        stat
        .writeStream
        .outputMode("complete")
        .format("console")
        .option("truncate", "true")
        .start()
    )

Этот пример без триггера времени, но я могу сделать это сам. Из-за того, что не разрешено использовать countDistinct, у меня нет идей для выполнения моего упражнения. Я попытался сделать 2 dfs для каждой агрегации (df_1 = (домен, представление), df_2 = (домен, уникальный)) и затем присоединиться к df_1 с df_2, но также не разрешено иметь несколько агрегаций. Так что это тупик для меня. Будет круто иметь решение за это.

Спасибо за внимание!

1 Ответ

0 голосов
/ 02 марта 2020

Вы можете сделать это с помощью flatMapGroupWithState, которая является произвольной функцией состояния. Кроме того, она поддерживает режим добавления и режим обновления.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...