Scala - нужна помощь в доступе к каждой строке определенного столбца, например, x (i + 1) и x (i-1) - PullRequest
0 голосов
/ 11 июня 2019

У меня есть фрейм данных spark, в котором у меня есть несколько столбцов, таких как tin, year, date_begin, date_end, непрерывный_data

    tin   year    continuous_data
    a1    2017          0
    a1    2017          1
    a1    2017          0
    a1    2017          1
    a1    2017          1
    a1    2017          0
    a1    2017          1
    a1    2017          1
    a1    2017          1
    a1    2017          0
    a1    2017          1

аналогично, у меня есть еще 2 столбца, которые имеют формат даты и времени как (гггг-мм-дд ЧЧ: мм: сс).

Мне нужно получить доступ к каждой строке столбца непрерывных данных, например, x (i + 1) и x (i-1). В моем случае это как

непрерывные_данные (i) - текущее значение строки
непрерывные_данные (i-1) - предыдущее значение строки
непрерывные_данные (i + 1) - значение следующей строки

чтобы моя потребность была как ниже

    tin   year    continuous_data    prev_data    next_data
    a1    2017          0                null        1
    a1    2017          1                0           0
    a1    2017          0                1           1
    a1    2017          1                0           1    
    a1    2017          1                1           0
    a1    2017          0                1           1
    a1    2017          1                0           1
    a1    2017          1                1           1
    a1    2017          1                1           0
    a1    2017          0                1           1
    a1    2017          1                0           null

Мне нужно решить это в чистом Scala, вместо использования функций искры, где я достиг этого, используя оконные функции, которые по некоторым причинам не требуются.
Я пытаюсь решить это за последние несколько дней, но пока не могу решить. Может ли кто-нибудь помочь мне в решении этой проблемы.

1 Ответ

0 голосов
/ 12 июня 2019

Если вам нужно обрабатывать операции на основе окна Spark без использования функций spark Window и Spark sql, вы можете сделать это с помощью UDAF. Работа с UDAF и UDF - это то, что в некоторых блогах не рекомендуется использовать, если в этом нет необходимости. Но если вы можете позволить себе потерять некоторую производительность и, возможно, большие паузы в GC, вы можете попытаться поиграть с вашими собственными преобразованиями / агрегациями Spark.

Пример:

предположим, что вы хотите выполнить, и некоторое окно слайдов в вашем наборе данных, которое может быть представлено как:

item: String, key: String, timestamp: Long, field1:String, field2:Int, field3:Int, field4:Int

И вы хотите, например, реализовать приращение field2 в качестве нового поля вашего фрейма данных, и вы хотите сделать это без использования Spark sql и вам нужно использовать систему типов Scala, например, вы хотите выполнить операцию между двумя строками, используя экземпляр Monoid . Возможно, в этом случае лучше работать напрямую с RDD ... Ниже приведен пример работы с Dataframe api.

Одновременная работа с фреймами данных и типами Scala несколько обременительна, поскольку приходится иметь дело с обоими типами семейств:

Вы должны реализовать абстрактные члены UDAF:

class GenericAggregate(id: StringType, in: IntegerType, sort: LongType, output:IntegerType)(f: Seq[(String, Int)] => Seq[(String, Int)]) {

  private val mapType = MapType(id, MapType(sort, output, true), true)

  // This is the schema for your UDAF. The aggregation needs three fields from the input dataframe
  override def inputSchema: StructType =
    StructType(
      StructField("id_schema0", id) :: StructField("sort_schema0", sort) :: StructField(
        "input_schema0",
        in) :: Nil)

   // This is the internal fields you keep for computing your aggregate. 
  override def bufferSchema: StructType =
    StructType(StructField("internal_buffer", mapType) :: Nil)
  }

  // This kind of aggregation returns a key-value: key -> delta
  override def dataType: DataType = MapType(id, output)

  override def deterministic: Boolean = true

  // This is the initial value for your buffer schema.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
   buffer(0) = Map("" -> 0)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Map[String, Map[Long, Int]]](0) + (input.getAs[String](0) -> Map(
    input.getAs[Long](1) -> input.getAs[Int](2)))
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Map[String, Map[Long, Int]]](0) ++ buffer2
     .getAs[Map[String, Map[Long, Int]]](0)
  }


  override def evaluate(buffer: Row): Any = {
    val map = buffer.getAs[Map[String, Map[Long, Int]]](0)

    // You need to create a Seq from the map         

    val toSeq0 = map.mapValues(_.head)
    // As window functions you must order your events before applying the function
    val toSeq1 = toSeq0.toSeq.sortBy(_._2._1)
    val sequence = toSeq1.map(el => (el._1, el._2._2))

    /*
    For example, if your internal map is val in = 
        Map("ke1" -> Map(1L -> 3), "key2" -> Map(2L -> 3))
        you will get ArrayBuffer((ke1,3), (key2,3))
    */

    val result = f(sequence)  

    /*
       As a result you will have another ArrayBuffer with your new data
       Map("ke1" -> Map(1L -> 1), "key2" -> Map(2L -> 1))
    */
    result.toMap - k.initKey
  }

}

В этом примере карта используется для создания агрегации, но вам необходимо предоставить функцию, которая работает с этой коллекцией и возвращает новое поле.

Почему это необходимо? Ну, в случае, если вам нужно абстрагироваться или создать DSL поверх Spark Sql с более сложными типами, вы можете использовать деривацию классов типов для создания пользовательских оконных / агрегатных функций для ваших продуктов. Но, как я уже говорил, работа напрямую с функциями Spark SQL настоятельно рекомендуется в большинстве случаев . Эти альтернативы помогают лучше понять, как работает Spark, и дают возможность поиграть с компилятором, чтобы построить более общие конвейеры данных или даже создать DSL, которые могут быть выполнены через Spark Sql.

Надеюсь, это поможет.

...