В Spark структурированные потоки, как вывести полные агрегаты во внешний источник, такой как REST-сервис - PullRequest
0 голосов
/ 20 мая 2018

Задача, которую я пытаюсь выполнить, состоит в агрегировании подсчета значений из измерения (поля) в DataFrame, выполнении некоторой статистики, такой как среднее, максимальное, минимальное и т. Д., А затем вывод агрегатов во внешнюю систему с помощью API.вызов.Я использую водяные знаки, скажем, 30 секунд с размером окна 10 секунд.Я сделал эти размеры маленькими, чтобы мне было проще тестировать и отлаживать систему.

Единственный метод, который я нашел для выполнения вызовов API, - это использование ForeachWriter.Моя проблема в том, что ForeachWriter выполняется на уровне раздела и производит только агрегат на раздел.До сих пор я не нашел способа получить свернутые агрегаты, кроме как объединиться в 1, что является способом замедления для моего потокового приложения.

Я обнаружил, что если я использую приемник на основе файлов, такойкак писатель Паркет HDFS, что код производит реальные агрегаты.Это также работает очень хорошо.Что мне действительно нужно, так это достичь того же результата, но вызывать API, а не записывать в файловую систему.

Кто-нибудь знает, как это сделать?

Я пробовал это в Spark 2.2.2 и Spark 2.3 и получим такое же поведение.

Вот упрощенный фрагмент кода, иллюстрирующий то, что я пытаюсь сделать:

val valStream = streamingDF
  .select(
    $"event.name".alias("eventName"),
    expr("event.clientTimestamp / 1000").cast("timestamp").as("eventTime"),
    $"asset.assetClass").alias("assetClass")
  .where($"eventName" === 'MyEvent')
  .withWatermark("eventTime", "30 seconds")
  .groupBy(window($"eventTime", "10 seconds", $"assetClass", $"eventName")
  .agg(count($"eventName").as("eventCount"))
  .select($"window.start".as("windowStart"), $"window.end".as("windowEnd"), $"assetClass".as("metric"), $"eventCount").as[DimAggregateRecord]
  .writeStream
  .option("checkpointLocation", config.checkpointPath)
  .outputMode(config.outputMode)

val session = (if(config.writeStreamType == AbacusStreamWriterFactory.S3) {
    valStream.format(config.outputFormat)
    .option("path", config.outputPath)
  }
  else {
    valStream.foreach(--- this is my DimAggregateRecord ForEachWriter ---)
  }).start()

1 Ответ

0 голосов
/ 19 июня 2018

Я ответил на свой вопрос.Я обнаружил, что перераспределение по времени начала окна сделало свое дело.Это перетасовывает данные так, что все строки с одинаковой группой и временем запуска WindowStart находятся на одном исполнителе.Приведенный ниже код создает файл для каждого интервала окна группы.Это также работает довольно хорошо.У меня нет точных чисел, но он дает агрегаты за меньшее время, чем интервал в 10 секунд.

val valStream = streamingDF
  .select(
    $"event.name".alias("eventName"),
    expr("event.clientTimestamp / 1000").cast("timestamp").as("eventTime"),
    $"asset.assetClass").alias("assetClass")
  .where($"eventName" === 'MyEvent')
  .withWatermark("eventTime", "30 seconds")
  .groupBy(window($"eventTime", "10 seconds", $"assetClass", $"eventName")
  .agg(count($"eventName").as("eventCount"))
  .select($"window.start".as("windowStart"), $"window.end".as("windowEnd"), $"assetClass".as("metric"), $"eventCount").as[DimAggregateRecord]

  .repartition($"windowStart")  // <-------- this line produces the desired result

  .writeStream
  .option("checkpointLocation", config.checkpointPath)
  .outputMode(config.outputMode)

val session = (if(config.writeStreamType == AbacusStreamWriterFactory.S3) {
    valStream.format(config.outputFormat)
    .option("path", config.outputPath)
  }
  else {
    valStream.foreach(--- this is my DimAggregateRecord ForEachWriter ---)
  }).start()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...