Задача, которую я пытаюсь выполнить, состоит в агрегировании подсчета значений из измерения (поля) в DataFrame, выполнении некоторой статистики, такой как среднее, максимальное, минимальное и т. Д., А затем вывод агрегатов во внешнюю систему с помощью API.вызов.Я использую водяные знаки, скажем, 30 секунд с размером окна 10 секунд.Я сделал эти размеры маленькими, чтобы мне было проще тестировать и отлаживать систему.
Единственный метод, который я нашел для выполнения вызовов API, - это использование ForeachWriter
.Моя проблема в том, что ForeachWriter
выполняется на уровне раздела и производит только агрегат на раздел.До сих пор я не нашел способа получить свернутые агрегаты, кроме как объединиться в 1, что является способом замедления для моего потокового приложения.
Я обнаружил, что если я использую приемник на основе файлов, такойкак писатель Паркет HDFS, что код производит реальные агрегаты.Это также работает очень хорошо.Что мне действительно нужно, так это достичь того же результата, но вызывать API, а не записывать в файловую систему.
Кто-нибудь знает, как это сделать?
Я пробовал это в Spark 2.2.2 и Spark 2.3 и получим такое же поведение.
Вот упрощенный фрагмент кода, иллюстрирующий то, что я пытаюсь сделать:
val valStream = streamingDF
.select(
$"event.name".alias("eventName"),
expr("event.clientTimestamp / 1000").cast("timestamp").as("eventTime"),
$"asset.assetClass").alias("assetClass")
.where($"eventName" === 'MyEvent')
.withWatermark("eventTime", "30 seconds")
.groupBy(window($"eventTime", "10 seconds", $"assetClass", $"eventName")
.agg(count($"eventName").as("eventCount"))
.select($"window.start".as("windowStart"), $"window.end".as("windowEnd"), $"assetClass".as("metric"), $"eventCount").as[DimAggregateRecord]
.writeStream
.option("checkpointLocation", config.checkpointPath)
.outputMode(config.outputMode)
val session = (if(config.writeStreamType == AbacusStreamWriterFactory.S3) {
valStream.format(config.outputFormat)
.option("path", config.outputPath)
}
else {
valStream.foreach(--- this is my DimAggregateRecord ForEachWriter ---)
}).start()