Вот мое понимание ваших требований, основанное на описании вашего вопроса и комментарии:
Цикл по строке RDD с collect
-изложением, и всякий раз, когда nd
в текущей строке меньшебольше или равно ndLimit
, извлеките ts
из предыдущей строки и сбросьте ndLimit
в значение nd
из той же строки.
Если это правильно, я бы предложил использоватьfoldLeft
для составления списка временных меток, как показано ниже:
import org.apache.spark.sql.Row
val s_n181n = Seq(
(1, "a1", 101L, "b1", 1.0), // nd 1.0 is the initial limit
(2, "a2", 102L, "b2", 1.6),
(3, "a3", 103L, "b3", 1.2),
(4, "a4", 104L, "b4", 0.8), // 0.8 <= 1.0, hence ts 103 is saved and nd 1.2 is the new limit
(5, "a5", 105L, "b5", 1.5),
(6, "a6", 106L, "b6", 1.3),
(7, "a7", 107L, "b7", 1.1), // 1.1 <= 1.2, hence ts 106 is saved and nd 1.3 is the new limit
(8, "a8", 108L, "b8", 1.2) // 1.2 <= 1.3, hence ts 107 is saved and nd 1.1 is the new limit
).toDF("c1", "c2", "ts", "c4", "nd")
val s_rows = s_n181n.rdd.collect
val s_list = s_rows.map(r => (r.getAs[Long](2), r.getAs[Double](4))).toList
// List[(Long, Double)] = List(
// (101,1.0), (102,1.6), (103,1.2), (104,0.8), (105,1.5), (106,1.3), (107,1.1), (108,1.2)
// )
val ndLimit = s_list.head._2 // 1.0
s_list.tail.foldLeft( (s_list.head._1, s_list.head._2, ndLimit, List.empty[Long]) ){
(acc, x) =>
if (x._2 <= acc._3)
(x._1, x._2, acc._2, acc._1 :: acc._4)
else
(x._1, x._2, acc._3, acc._4)
}._4.reverse
// res1: List[Long] = List(103, 106, 107)
Обратите внимание, что кортеж ( previous ts, previous nd, current ndLimit, list of timestamps )
используется в качестве аккумулятора для переноса элементов из предыдущей строки для необходимой логики сравнения втекущая строка.