Я использую Scala, но решение, которое я придумала, это
-Используйте оконные функции, чтобы найти число из последней строки с помощью is_skill = true перед строкой, где parent_Id не равен предыдущему parent_Id -Самостоятельное объединение фрейма данныхдля сопоставления строк
Требуется ли следующий вывод?
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
|rownum|viewid|skillid|parentId| post_timestamp|is_skill|column A|matchedParentId|isAEqual|
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
| 1| 251| b| xyz12|20190131 09:24:02...| true| abcde| null| true|
| 2| 251| b| abc34|20190131 10:24:02...| false| 453aw| false| false|
| 3| 251| b| abc34|20190131 11:24:02...| false| abcde| true| true|
| 5| 94| a| yui67|20190131 09:06:57...| true| nnnn| false| true|
| 6| 94| a| qwe12|20190131 09:24:02...| false| 2n21q| false| false|
| 7| 94| a| qwe12|20190131 10:06:57...| false| nnnnq| true| false|
| 8| 94| a| rty87|20190131 15:07:57...| true| 1234| false| true|
| 9| 94| a| bnm22|20190131 16:28:05...| true| 1234| false| true|
| 10| 94| a| bnm22|20190131 17:28:05...| true| 6789| true| true|
| 11| 94| b| tyu12|20190131 09:24:02...| true| 6789| null| true|
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
Вот код:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
import spark.implicits._
val df = Seq((1,251 ,"b" ,"xyz12" ,"20190131 09:24:02.868",true ,"abcde"),
(2 ,251 ,"b" ,"abc34" ,"20190131 10:24:02.868", false ,"453aw"),
(3 ,251 ,"b" ,"abc34" ,"20190131 11:24:02.868", false ,"abcde"),
(4 ,94 ,"a" ,"ghi23" ,"20190131 02:28:05.107", false ,"bbbbb"),
(5 ,94 ,"a" ,"yui67" ,"20190131 09:06:57.976", true ,"nnnn"),
(6 ,94 ,"a" ,"qwe12" ,"20190131 09:24:02.868", false ,"2n21q"),
(7 ,94 ,"a" ,"qwe12" ,"20190131 10:06:57.976", false ,"nnnnq"),
(8 ,94 ,"a" ,"rty87" ,"20190131 15:07:57.976", true ,"1234"),
(9 ,94 ,"a" ,"bnm22" ,"20190131 16:28:05.107", true ,"1234"),
(10 ,94 ,"a" ,"bnm22" ,"20190131 17:28:05.107",true ,"6789"),
(11 ,94 ,"b" ,"tyu12" ,"20190131 09:24:02.868",true ,"6789")).
toDF("rownum", "viewid", "skillid", "parentId" , "post_timestamp", "is_skill", "column A")
val w = Window.partitionBy("viewid", "skillid").orderBy("post_timestamp")
val df2 = df.withColumn("matchedParentId", lag($"parentId", 1).over(w).equalTo($"parentId")).
withColumn("rank", rank.over(w)).withColumn("is_skill_int", when($"is_skill", 1).otherwise(0)).
withColumn("test", max($"is_skill_int" * $"rank").over(w))
val df3 = df2.as("df_left").
join(df2.as("df_right"), $"df_left.viewid".equalTo($"df_right.viewid").
and($"df_left.skillid".equalTo($"df_right.skillid")).
and($"df_left.rank".equalTo($"df_right.test"))).
withColumn("isAEqual", $"df_left.column A".equalTo($"df_right.column A")).
select("df_right.rownum", "df_right.viewid", "df_right.skillid", "df_right.parentId", "df_right.post_timestamp", "df_right.is_skill", "df_right.column A", "df_right.matchedParentId", "isAEqual").
orderBy("rownum")
df3.show