Фрейм данных Spark 1.6 - PullRequest
       3

Фрейм данных Spark 1.6

0 голосов
/ 13 ноября 2018

Дамп данных

Work_Id,Assigned_to,Date,Status   
R1,John,3/4/15,Not Started   
R1,John,3/5/15,In Progress        
R1,John,3/6/15,Finished     
R3,Alaxender,3/7/15,In Progress   
R3,Alaxender,3/8/15,In Progress   
R4,Patrick,3/9/15,Finished   
R5,Peter,3/11/15,Finished   
R7,George,3/13/15,Not Started   
R7,George,3/14/15,In Progress   
R8,John,3/15/15,In Progress    
R8,John,3/16/15,In Progress   
R9,Alaxender,3/17/15,Not Started

Окончательный вывод

Work_Id,Assigned_to,Date,Status   
R1,John,3/6/15,Finished    
R7,George,3/14/15,In Progress    
R9,Alaxender,3/17/15,Not Started    
R3,Alaxender,3/7/15,In Progress    
R3,Alaxender,3/8/15,In Progress    
R4,Patrick,3/9/15,Finished    
R5,Peter,3/11/15,Finished    
R8,John,3/15/15,In Progress    
R8,John,3/16/15,In Progress 

Существует такой же набор данных, как и выше, который состоит из рабочих заданий. Если есть последующий запрос для того же человека, и статус имеет статус «Не начато», тогда будет подтверждена последняя запись (сортировка по дате). если существует только одна запись со статусом «Не начато», то эта запись будет квалифицирована.

Например:

R1,John,3/4/15,Not Started    
R1,John,3/5/15,In Progress   
R1,John,3/6/15,Finished   

Эта запись будет квалифицирована

R1,John,3/6/15,Finished

Остальные все записи, кроме статуса «Не начато» для одного и того же человека, будут квалифицированы в выводе.

Буду признателен за любую помощь, чтобы сделать это в кадре данных Spark 1.6 с использованием scala.

1 Ответ

0 голосов
/ 14 ноября 2018

У меня есть ответ, но в настоящее время это снижает производительность труда. Есть ли лучший способ сделать это?

val df = myFile.toDF()

val dfFilter = df.filter($"status" === "Not Started")

val dfSelect = dfFilter.select(($"Assigned_to").alias("person"))

val dfInner = dfSelect.join(df, $"person" === $"Assigned_to")

val windowSpec = Window.partitionBy($"Assigned_to").orderBy(col("Date").desc)

val dfRank = dfInner.withColumn("rank", rank().over(windowSpec)).filter($"rank" === "1")

val dfDrop = dfRank.drop($"rank").drop($"person")

val dfLeftOuter = df.join(dfSelect, $"Assigned_to" === $"person", "leftouter")

val nullDf = dfLeftOuter.filter($"person".isNull).drop($"person")

nullDf.unionAll(dfDrop).show
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...