Spark Scala DataFrame для цикла - PullRequest
       10

Spark Scala DataFrame для цикла

0 голосов
/ 11 октября 2018

мой входной фрейм данных выглядит следующим образом:

    index    bucket    time    ap   station    rssi
    0         1        00:00   1       1       -84.0
    1         1        00:00   1       3       -67.0
    2         1        00:00   1       4       -82.0
    3         1        00:00   1       2       -68.0
    4         2        00:15   1       3       -83.0
    5         2        00:15   1       2       -82.0
    6         2        00:15   1       4       -80.0
    7         2        00:15   1       1       -72.0
    8         3        00:30   1       4       -85.0
    9         3        00:30   1       3       -77.0
    10        3        00:30   1       2       -70.0

Я новичок в Scala Spark, и я хотел бы зациклить данные следующим образом:

for each ap 
   for each station 
      for each bucket 
         if rssi(previous bucket)<rssi(bucket)
         print message

Вотначало моего приложения spark:

object coveralg {

    def main(args: Array[String]) {

        val spark = SparkSession.builder().appName("coveralg").getOrCreate()
        import spark.implicits._
        val input_data =  spark.read.format("csv").option("header","true").load(args(0))

    }
}

, но я не знаю, как реализовать цикл над фреймом данных и выбрать значения, чтобы сделать if

1 Ответ

0 голосов
/ 11 октября 2018

DataFrame не предназначены для этого.Они предназначены для применения одного и того же преобразования к каждой записи или уменьшения их.Вы можете добавить столбец с Boolean, реализующим ваш if:

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"ap",$"station",$"bucket").
        orderBy(unix_timestamp($"time")).
        rangeBetween(Long.MinValue, -1)

val df = input_data.withColumn("shouldPrintMessage",when(max($"rssi".over(w))>$"rssi",true))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...