Применить атрибутов операций в Iterable класса case в Scala - PullRequest
0 голосов
/ 17 июня 2020

Я новичок в ie в Spark и особенно в Scala, поэтому любая помощь приветствуется. У меня есть Iterable класса case X, который является параметром для функции обновления состояния, в которой состояние определяется Tuple2 со строкой и классом case Y.

val updateState = (id: String, Xs: Option[Iterable[X]], state: State[Tuple2[String,Y]]) {
    ...
}

, где определения классы case X и Y следующие:

case class X(Elem1: String, Elem2: int, Elem3: Date, Elem4: Double, Elem5: String, Elem6: String)
case class Y(Elem1: String, Elem2: Double, Elem3: Double, Elem4: Double, Elem5: Int)

Допустим, класс case Y будет хранить значения результатов для некоторых операций, которые должны применяться поэлементно в значениях атрибутов для всех классов X принадлежащий к тому же Id; чтобы лучше это представить, позвольте мне использовать пример с моим форматом DStream:

val DStream = (Id1,ArrayBuffer(X(Id1,intA,date1,double1,xxxxx,yyyy)))
              (Id2,ArrayBuffer(X(Id2,intB,date2,double2,xxxxx,yyyy), 
                               X(Id2,intC,date2,double2,xxxxx,yyyy), 
                               X(Id2,intC,date2,double2,xxxxx,yyyy),
                               X(Id2,intD,date2,double3,xxxxx,yyyy)))
              (Id3,ArrayBuffer(X(Id3,intD,date3,double4,xxxxx,yyyy),
                               X(Id3,intE,date3,double5,xxxxx,yyyy)))
              ...

Учитывая это, я хочу выполнить вычисления по некоторым из этих значений и создать класс Y для каждого идентификатора, например это:

Y(id, avg(X(Elem4)), min(X(Elem4)), max(X(Elem4)), sum(X(Elem2)), count_Xs_for_ID)

Идея состоит в том, чтобы наконец применить эту функцию к DStream, используя mapWithState (более производительный, как я знал, чем updateStateByKey):

val stateDStream = DStream.mapWithState(StateSpec.function(updateState))

Я видел метод "zip", который применяется к 2 массивам, но мне не кажется подходящим для этой цели; возможно, карта, примененная к параметру Xs, которая использует функцию, применяемую для каждого класса X, могла бы помочь, но я немного потерялся, возможно, я становлюсь сложным для образца темы, любого, кто может дать мне несколько подсказок или правильно направить мне, чтобы добиться этого?

Спасибо за ваше время, JL

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...