как извлечь значение первого столбца последней строки в кадре данных spark scala внутри цикла for и if - PullRequest
0 голосов
/ 12 октября 2018

s_n181n - это фрейм данных, и здесь я прохожу 3-й и 5-й столбец строки фрейма данных

и

, где столбец nd равен <=1.0, он разбиваеткод

ts(timestamp) | nd (nearest distance)

- это выходные столбцы, показанные выше

But what i need is the timestamp of last row value i.e 1529157727000 

Я хочу разорвать цикл, показывающий последнее значениецикла здесь.Как сохранить значение метки времени последней строки в переменной, чтобы за пределами этого цикла я мог ее использовать.

1 Ответ

0 голосов
/ 12 октября 2018

Вот мое понимание ваших требований, основанное на описании вашего вопроса и комментарии:

Цикл по строке RDD с collect -изложением, и всякий раз, когда nd в текущей строке меньшебольше или равно ndLimit, извлеките ts из предыдущей строки и сбросьте ndLimit в значение nd из той же строки.

Если это правильно, я бы предложил использоватьfoldLeft для составления списка временных меток, как показано ниже:

import org.apache.spark.sql.Row

val s_n181n = Seq(
  (1, "a1", 101L, "b1", 1.0),  // nd 1.0 is the initial limit
  (2, "a2", 102L, "b2", 1.6),
  (3, "a3", 103L, "b3", 1.2),
  (4, "a4", 104L, "b4", 0.8),  // 0.8 <= 1.0, hence ts 103 is saved and nd 1.2 is the new limit
  (5, "a5", 105L, "b5", 1.5),
  (6, "a6", 106L, "b6", 1.3),
  (7, "a7", 107L, "b7", 1.1),  // 1.1 <= 1.2, hence ts 106 is saved and nd 1.3 is the new limit
  (8, "a8", 108L, "b8", 1.2)   // 1.2 <= 1.3, hence ts 107 is saved and nd 1.1 is the new limit
).toDF("c1", "c2", "ts", "c4", "nd")

val s_rows = s_n181n.rdd.collect

val s_list = s_rows.map(r => (r.getAs[Long](2), r.getAs[Double](4))).toList
// List[(Long, Double)] = List(
//   (101,1.0), (102,1.6), (103,1.2), (104,0.8), (105,1.5), (106,1.3), (107,1.1), (108,1.2)
// )

val ndLimit = s_list.head._2  // 1.0

s_list.tail.foldLeft( (s_list.head._1, s_list.head._2, ndLimit, List.empty[Long]) ){
  (acc, x) =>
    if (x._2 <= acc._3)
      (x._1, x._2, acc._2, acc._1 :: acc._4)
    else
      (x._1, x._2, acc._3, acc._4)
}._4.reverse
// res1: List[Long] = List(103, 106, 107)

Обратите внимание, что кортеж ( previous ts, previous nd, current ndLimit, list of timestamps ) используется в качестве аккумулятора для переноса элементов из предыдущей строки для необходимой логики сравнения втекущая строка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...