Как выполнить агрегацию без сохранения состояния в spark с использованием Structured Streaming 2.3.0 без использования flatMapsGroupWithState? - PullRequest
0 голосов
/ 05 мая 2018

Как выполнить агрегацию без сохранения состояния в Spark с использованием структурированной потоковой передачи 2.3.0 без использования flatMapsGroupWithState или Dstream API? ищу более декларативный способ

Пример:

select count(*) from some_view

Я хочу, чтобы выходные данные просто подсчитывали все записи, доступные в каждой партии, но не суммировали с предыдущей партией

1 Ответ

0 голосов
/ 07 мая 2018

Чтобы выполнить агрегацию без сохранения состояния в Spark с использованием Structured Streaming 2.3.0 без использования flatMapsGroupWithState или Dstream API, вы можете использовать следующий код:

import spark.implicits._

def countValues = (_: String, it: Iterator[(String, String)]) => it.length

val query =
  dataStream
    .select(lit("a").as("newKey"), col("value"))
    .as[(String, String)]
    .groupByKey { case(newKey, _) => newKey }
    .mapGroups[Int](countValues)
    .writeStream
    .format("console")
    .start()

Вот что мы делаем -

  1. Мы добавили один столбец к нашему datastream - newKey. Мы сделали это так, чтобы мы могли сделать groupBy поверх него, используя groupByKey. Я использовал буквальную строку "a", но вы можете использовать что угодно. Кроме того, вам нужно выбрать любой столбец из доступных столбцов в datastream. Для этой цели я выбрал столбец value, вы можете выбрать любого.
  2. Мы создали функцию отображения - countValues, чтобы подсчитать значения, агрегированные функцией groupByKey, написав it.length.

Таким образом, таким образом, мы можем подсчитать любые записи, доступные в каждом пакете, но не агрегируя из предыдущего пакета.

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...