Попытка разобраться в SparkSql относительно структурированной потоковой передачи Spark.Spark Session читает события из раздела kafka, собирает данные в счетчики, сгруппированные по различным именам столбцов, и выводит их на консоль.Необработанные входные данные структурированы следующим образом:
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
|. sourceTypes| Guid| platform|datacenter|pagesId| eventTimestamp| Id1234| Id567890|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
| Notififcation|....................| ANDROID| dev| aa|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................| ANDROID| dev| ab|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:46|0ee089c1-d5da-3b3...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:48|57c18964-40c9-311...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:52|d9fc4cfa-0934-3e9...|{"id":...|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+---------+
Необходим счет для sourceTypes
, platform
, datacenter
и pageId
.
Агрегирование данных с помощью следующего кода:
Dataset<Row> query = sourceDataset
.withWatermark("eventTimestamp", watermarkInterval)
.select(
col("eventTimestamp"),
col("datacenter"),
col("platform"),
col("pageId")
)
.groupBy(
window(col("eventTimestamp"), windowInterval),
col("datacenter"),
col("platform"),
col("pageId")
)
.agg(
max(col("eventTimestamp"))
);
Здесь watermarkInterval=45seconds
, windowInterval=15seconds
& triggerInterval=15seconds
.
Использование нового агрегированного набора данных с:
aggregatedDataset
.writeStream()
.outputMode(OutputMode.Append())
.format("console")
.trigger(Trigger.ProcessingTime(triggerInterval))
.start();
Существует несколько проблем:
Выходные данные не печатают счетчики для каждого groupBy
, например, платформы, идентификатора страницы и т. Д.
Как напечатать вывод в формате json?Я пытался использовать select(to_json(struct("*")).as("value"))
при выводе данных на консоль, но это не работает.