Строка, где выполняется агрегация .groupBy(window($"event_time", "10 seconds", "5 seconds"), $"section", $"timestamp")
, создает тип данных struct<start:timestamp,end:timestamp>
, который не поддерживается источником данных CSV.
Просто df_agg_without_time.printSchema
и вы увидите столбец.
Решение состоит в том, чтобы просто преобразовать его в какой-то другой более простой тип (возможно, с select
или withColumn
) или просто select
из него (т.е. не включить в следующий кадр данных).
Ниже приведен пример структурированного запроса (не потокового), который показывает схему, которую использует ваш потоковый структурированный запрос (при создании df_agg_without_time
).
val q = spark
.range(4)
.withColumn("t", current_timestamp)
.groupBy(window($"t", "10 seconds"))
.count
scala> q.printSchema
root
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)
Для примера потокового запроса вы можете использовать источник данных о скорости.
val q = spark
.readStream
.format("rate")
.load
.groupBy(window($"timestamp", "10 seconds"))
.count
scala> q.printSchema
root
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)