Использование Spark SQL с Spark Streaming - PullRequest
0 голосов
/ 27 сентября 2018

Попытка разобраться в SparkSql относительно структурированной потоковой передачи Spark.Spark Session читает события из раздела kafka, собирает данные в счетчики, сгруппированные по различным именам столбцов, и выводит их на консоль.Необработанные входные данные структурированы следующим образом:

+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
|.  sourceTypes|                Guid|  platform|datacenter|pagesId|     eventTimestamp|              Id1234|  Id567890|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
| Notififcation|....................|   ANDROID|       dev|     aa|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................|   ANDROID|       dev|     ab|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:46|0ee089c1-d5da-3b3...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|57c18964-40c9-311...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:52|d9fc4cfa-0934-3e9...|{"id":...|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+---------+

Необходим счет для sourceTypes, platform, datacenter и pageId.

Агрегирование данных с помощью следующего кода:

Dataset<Row> query = sourceDataset
        .withWatermark("eventTimestamp", watermarkInterval)
        .select(
            col("eventTimestamp"),
            col("datacenter"),
            col("platform"),
            col("pageId")
        )
        .groupBy(
            window(col("eventTimestamp"), windowInterval),
            col("datacenter"),
            col("platform"),
            col("pageId")
        )
        .agg(
            max(col("eventTimestamp"))
        );

Здесь watermarkInterval=45seconds, windowInterval=15seconds & triggerInterval=15seconds.

Использование нового агрегированного набора данных с:

aggregatedDataset
        .writeStream()
        .outputMode(OutputMode.Append())
        .format("console")
        .trigger(Trigger.ProcessingTime(triggerInterval))
        .start();

Существует несколько проблем:

  1. Выходные данные не печатают счетчики для каждого groupBy, например, платформы, идентификатора страницы и т. Д.

  2. Как напечатать вывод в формате json?Я пытался использовать select(to_json(struct("*")).as("value")) при выводе данных на консоль, но это не работает.

1 Ответ

0 голосов
/ 27 сентября 2018

Вы можете решить свою проблему, используя следующий фрагмент кода:

.outputMode("complete")
...