Чтобы выполнить агрегацию без сохранения состояния в Spark с использованием Structured Streaming 2.3.0 без использования flatMapsGroupWithState
или Dstream API, вы можете использовать следующий код:
import spark.implicits._
def countValues = (_: String, it: Iterator[(String, String)]) => it.length
val query =
dataStream
.select(lit("a").as("newKey"), col("value"))
.as[(String, String)]
.groupByKey { case(newKey, _) => newKey }
.mapGroups[Int](countValues)
.writeStream
.format("console")
.start()
Вот что мы делаем -
- Мы добавили один столбец к нашему
datastream
- newKey
. Мы сделали это так, чтобы мы могли сделать groupBy
поверх него, используя groupByKey
. Я использовал буквальную строку "a"
, но вы можете использовать что угодно. Кроме того, вам нужно выбрать любой столбец из доступных столбцов в datastream
. Для этой цели я выбрал столбец value
, вы можете выбрать любого.
- Мы создали функцию отображения -
countValues
, чтобы подсчитать значения, агрегированные функцией groupByKey
, написав it.length
.
Таким образом, таким образом, мы можем подсчитать любые записи, доступные в каждом пакете, но не агрегируя из предыдущего пакета.
Надеюсь, это поможет!