Условное сокращение на DStream - PullRequest
0 голосов
/ 07 мая 2019

У меня есть DStream[RawWeatherData] объект с именем parsedWeatherStream. Каждый объект в RawWeatherData классе будет выглядеть так:

( "725030: 14732", 2008,1,1 , 1,5.0, -3.3,1020.6,290,4.1,2,0.0, 0,2 , 0,0)

Параметры вышеуказанного объекта: (wsid, year, month, day, hour, temperature, dewpoint, pressure, windDirection, windSpeed, skyCondition, oneHourPrecip, sixHourPrecip)

Подобные объекты - это то, что я буду получать в контексте потоковой передачи Spark от Кафки.

Моя конечная цель - сложить все значения oneHourPrecip для всех объектов в DStream (выделено в объекте, выделенном жирным шрифтом справа), с условием проверки, соответствуют ли значения, выделенные жирным шрифтом слева (wsid, year, month, day), при сравнении двух объектов в потоке, одинаковы или нет.

Для этого я смог сделать карту из parsedStream, вынимая только нужные мне объекты:

val newStream = parsedWeatherStream.map { weather =>
    (weather.wsid, weather.year, weather.month, weather.day, weather.oneHourPrecip)
}

Теперь я не уверен, какую технику использовать, чтобы суммировать все значения oneHourPrecip. Я попытался сделать условное сокращение, но это приводит к ошибке:

// ERROR: Type mismatch
val transformedStream = newStream.reduce{(a , b) => {
    if (a._1 == b._1 && a._2 == b._2 && a._3 == b._3 && a._4 == b._4)
        (a._1 , a._2 , a._3 , a._4 , a._5 + b._5)
    else
        None //Would like to do Nothing here, so returning None
}}

Я также изучил операцию transform, но это также, похоже, не помогает моему делу.

1 Ответ

1 голос
/ 08 мая 2019

Для объекта newStream в моем вопросе выше я внес небольшое, но неуловимое изменение. Я добавил свои ключи (эти 4 значения) в парантезы, чтобы представить newStream как пару ключ-значение. Тогда все, что мне нужно было сделать - это reduceByKey.

val newStream = parsedWeatherStream.map { weather =>
    ((weather.wsid, weather.year, weather.month, weather.day), weather.oneHourPrecip)
}.reduceByKey{(a , b) =>
    a + b
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...