Фильтрация данных с использованием hashmap - PullRequest
0 голосов
/ 29 декабря 2018

У меня есть хэш-карта, в которой я сохранил значения

Map(862304021470656 -> List(0.0, 0.0, 0.0, 0.0, 1.540980096E9, 74.365111, 22.302669, 0.0),866561010400483 -> List(0.0, 1.0, 1.0, 2.0, 1.543622306E9, 78.0204, 10.005262, 56.0))

Это фрейм данных

|             id|       lt|       ln|       evt|    lstevt|  s|  d|agl|chg| d1| d2| d3| d4|ebt|ibt|port| a1| a2| a3| a4|nos|dfrmd|
+---------------+---------+---------+----------+----------+---+---+---+---+---+---+---+---+---+---+----+---+---+---+---+---+-----+
|862304021470656|25.284158|82.435973|1540980095|1540980095|  0| 39|298|  0|  0|  1|  1|  2|  0|  5|  97| 12| -1| -1| 22|  0|    0|
|862304021470656|25.284158|82.435973|1540980105|1540980105|  0|  0|298|  0|  0|  1|  1|  2|  0|  5|  97| 12| -1| -1| 22|  0|    0|
|862304021470656|25.284724|82.434222|1540980155|1540980155| 14| 47|289|  0|  0|  1|  1|  2|  0|  5|  97| 11| -1| -1| 22|  0|    0|
|866561010400483|25.284858|82.433831|1544980165|1540980165| 12| 42|295|  0|  0|  1|  1|  2|  0|  5|  97| 12| -1| -1| 22|  0|    0|

Я хочу просто отфильтровать эти значения из фрейма данных, сравнивая 4-й индексlist из столбца evt, выбирая только те строки, значение evt которых превышает 4-е значение индекса списка, ключом на карте является столбец id из dataframe.

Ответы [ 2 ]

0 голосов
/ 30 декабря 2018

Вы можете получить это с помощью простого sql:

import spark.implicits._
import org.apache.spark.sql.functions._
val df = ... //your main Dataframe
val map = Map(..your data here..).toDF("id", "list")
val join = df.join(map, "id").filter(length($"list") >= 5 /* <-- just in case */)
val res = join.filter($"evt" > $"list"(4))
0 голосов
/ 29 декабря 2018

Вот один из способов использования UDF для получения значения evt для сравнения:

import org.apache.spark.sql.functions._

val df = Seq(
  (862304021470656L, 25.284158, 82.435973, 1540980095),
  (862304021470656L, 25.284158, 82.435973, 1540980105),
  (862304021470656L, 25.284724, 82.434222, 1540980155),
  (866561010400483L, 25.284858, 82.433831, 1544980165)
).toDF("id", "lt", "ln", "evt")

val listMap = Map(
  862304021470656L -> List(0.0, 0.0, 0.0, 0.0, 1.540980096E9, 74.365111, 22.302669, 0.0),
  866561010400483L -> List(0.0, 1.0, 1.0, 2.0, 1.543622306E9, 78.0204, 10.005262, 56.0)
)

def evtLimit(m: Map[Long, List[Double]], evtIdx: Int) = udf(
  (id: Long) => m.get(id) match {
      case Some(ls) => if (evtIdx < ls.size) ls(evtIdx) else Double.MaxValue
      case None => Double.MaxValue
    }
)

df.where($"evt" > evtLimit(listMap, 4)($"id")).show
// +---------------+---------+---------+----------+
// |             id|       lt|       ln|       evt|
// +---------------+---------+---------+----------+
// |862304021470656|25.284158|82.435973|1540980105|
// |862304021470656|25.284724|82.434222|1540980155|
// |866561010400483|25.284858|82.433831|1544980165|
// +---------------+---------+---------+----------+

Обратите внимание, что UDF возвращает Double.MaxValue в случае несовпадения ключа или недопустимого значения в предоставленной карте,Это, безусловно, может быть пересмотрено для конкретных бизнес-требований.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...