У меня есть DStream[RawWeatherData]
объект с именем parsedWeatherStream
. Каждый объект в RawWeatherData
классе будет выглядеть так:
( "725030: 14732", 2008,1,1 , 1,5.0, -3.3,1020.6,290,4.1,2,0.0, 0,2 , 0,0)
Параметры вышеуказанного объекта: (wsid, year, month, day, hour, temperature, dewpoint, pressure, windDirection, windSpeed, skyCondition, oneHourPrecip, sixHourPrecip)
Подобные объекты - это то, что я буду получать в контексте потоковой передачи Spark от Кафки.
Моя конечная цель - сложить все значения oneHourPrecip
для всех объектов в DStream
(выделено в объекте, выделенном жирным шрифтом справа), с условием проверки, соответствуют ли значения, выделенные жирным шрифтом слева (wsid, year, month, day)
, при сравнении двух объектов в потоке, одинаковы или нет.
Для этого я смог сделать карту из parsedStream, вынимая только нужные мне объекты:
val newStream = parsedWeatherStream.map { weather =>
(weather.wsid, weather.year, weather.month, weather.day, weather.oneHourPrecip)
}
Теперь я не уверен, какую технику использовать, чтобы суммировать все значения oneHourPrecip
. Я попытался сделать условное сокращение, но это приводит к ошибке:
// ERROR: Type mismatch
val transformedStream = newStream.reduce{(a , b) => {
if (a._1 == b._1 && a._2 == b._2 && a._3 == b._3 && a._4 == b._4)
(a._1 , a._2 , a._3 , a._4 , a._5 + b._5)
else
None //Would like to do Nothing here, so returning None
}}
Я также изучил операцию transform
, но это также, похоже, не помогает моему делу.