Я новичок в ie в Spark и особенно в Scala, поэтому любая помощь приветствуется. У меня есть Iterable класса case X, который является параметром для функции обновления состояния, в которой состояние определяется Tuple2 со строкой и классом case Y.
val updateState = (id: String, Xs: Option[Iterable[X]], state: State[Tuple2[String,Y]]) {
...
}
, где определения классы case X и Y следующие:
case class X(Elem1: String, Elem2: int, Elem3: Date, Elem4: Double, Elem5: String, Elem6: String)
case class Y(Elem1: String, Elem2: Double, Elem3: Double, Elem4: Double, Elem5: Int)
Допустим, класс case Y будет хранить значения результатов для некоторых операций, которые должны применяться поэлементно в значениях атрибутов для всех классов X принадлежащий к тому же Id; чтобы лучше это представить, позвольте мне использовать пример с моим форматом DStream:
val DStream = (Id1,ArrayBuffer(X(Id1,intA,date1,double1,xxxxx,yyyy)))
(Id2,ArrayBuffer(X(Id2,intB,date2,double2,xxxxx,yyyy),
X(Id2,intC,date2,double2,xxxxx,yyyy),
X(Id2,intC,date2,double2,xxxxx,yyyy),
X(Id2,intD,date2,double3,xxxxx,yyyy)))
(Id3,ArrayBuffer(X(Id3,intD,date3,double4,xxxxx,yyyy),
X(Id3,intE,date3,double5,xxxxx,yyyy)))
...
Учитывая это, я хочу выполнить вычисления по некоторым из этих значений и создать класс Y для каждого идентификатора, например это:
Y(id, avg(X(Elem4)), min(X(Elem4)), max(X(Elem4)), sum(X(Elem2)), count_Xs_for_ID)
Идея состоит в том, чтобы наконец применить эту функцию к DStream, используя mapWithState (более производительный, как я знал, чем updateStateByKey):
val stateDStream = DStream.mapWithState(StateSpec.function(updateState))
Я видел метод "zip", который применяется к 2 массивам, но мне не кажется подходящим для этой цели; возможно, карта, примененная к параметру Xs, которая использует функцию, применяемую для каждого класса X, могла бы помочь, но я немного потерялся, возможно, я становлюсь сложным для образца темы, любого, кто может дать мне несколько подсказок или правильно направить мне, чтобы добиться этого?
Спасибо за ваше время, JL