Как указать или выбрать ячейку в кадре данных, Spark - Scala - PullRequest
0 голосов
/ 27 сентября 2018

Я хочу найти разницу во времени в 2 ячейки.

С arrays в python я бы сделал for loop st[i+1] - st[i] и где-то сохранил бы результаты.

У меня есть этот фрейм данных, отсортированный по времени.Как я могу сделать это с Spark 2 или Scala, достаточно псевдокода.

+--------------------+-------+
|                  st|   name|
+--------------------+-------+
|15:30               |dog    |
|15:32               |dog    |
|18:33               |dog    |
|18:34               |dog    |
+--------------------+-------+

Ответы [ 3 ]

0 голосов
/ 27 сентября 2018

Если вы специально хотите различать смежные элементы в коллекции, тогда в Scala я бы сжал коллекцию с хвостом, чтобы получить коллекцию, содержащую кортежи смежных пар.

К сожалению, нет метода хвостана RDD или DataFrames / Sets

Вы можете сделать что-то вроде:

val a = myDF.rdd
val tail = myDF.rdd.zipWithIndex.collect{
  case (index, v) if index > 1 => v}

a.zip(tail).map{ case (l, r) => /* diff l and r st column */}.collect
0 голосов
/ 27 сентября 2018

Если скользящие diff s должны быть вычислены для каждого раздела с помощью name, я бы использовал lag() оконную функцию:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("a", 100), ("a", 120),
  ("b", 200), ("b", 240), ("b", 270)
).toDF("name", "value")

val window = Window.partitionBy($"name").orderBy("value")

df.
  withColumn("diff", $"value" - lag($"value", 1).over(window)).
  na.fill(0).
  orderBy("name", "value").
  show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// |   a|  100|   0|
// |   a|  120|  20|
// |   b|  200|   0|
// |   b|  240|  40|
// |   b|  270|  30|
// +----+-----+----+

С другой стороны, если скользящая diff s должны быть вычислены по всему набору данных, функция Window без разделения не будет масштабироваться, поэтому я прибегну к использованию функции sliding() RDD:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.mllib.rdd.RDDFunctions._

val rdd = df.rdd

val diffRDD = rdd.sliding(2).
  map{ case Array(x, y) => Row(y.getString(0), y.getInt(1), y.getInt(1) - x.getInt(1)) }

val headRDD = sc.parallelize(Seq(Row.fromSeq(rdd.first.toSeq :+ 0)))

val headDF = spark.createDataFrame(headRDD, df.schema.add("diff", IntegerType))
val diffDF = spark.createDataFrame(diffRDD, df.schema.add("diff", IntegerType))

val resultDF = headDF union diffDF
resultDF.show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// |   a|  100|   0|
// |   a|  120|  20|
// |   b|  200|  80|
// |   b|  240|  40|
// |   b|  270|  30|
// +----+-----+----+
0 голосов
/ 27 сентября 2018

Что-то вроде:

object Data1 {

  import org.apache.log4j.Logger
  import org.apache.log4j.Level

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  def main(args: Array[String]) : Unit = {
    implicit val spark: SparkSession =
      SparkSession
        .builder()
        .appName("Test")
        .master("local[1]")
        .getOrCreate()

    import org.apache.spark.sql.functions.col

    val rows = Seq(Row(1, 1), Row(1, 1), Row(1, 1))
    val schema = List(StructField("int1", IntegerType, true), StructField("int2", IntegerType, true))

    val someDF = spark.createDataFrame(
      spark.sparkContext.parallelize(rows),
      StructType(schema)
    )

    someDF.withColumn("diff", col("int1") - col("int2")).show()
  }
}

дает

+----+----+----+
|int1|int2|diff|
+----+----+----+
|   1|   1|   0|
|   1|   1|   0|
|   1|   1|   0|
+----+----+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...